Skip to content

Commit

Permalink
Do not exec on pods during rolling update
Browse files Browse the repository at this point in the history
Fixes #304

Exec on pods only if StatefulSet is ready and up to date.

Before this commit, we observed in #304 that the controller tried to
exec into pods at the same time as the pods got updated due to a StatefulSet restart
resulting in connection errors.
  • Loading branch information
ansd authored and Zerpet committed Sep 15, 2020
1 parent 22a628b commit 4b231d4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
82 changes: 47 additions & 35 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, err
}

r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster)
if restarted := r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster); restarted {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
}

// Set ReconcileSuccess to true here because all CRUD operations to Kube API related
Expand All @@ -214,7 +216,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, err
}

if ok, err := r.allReplicasReady(ctx, rabbitmqCluster); !ok {
if ok, err := r.allReplicasReadyAndUpdated(ctx, rabbitmqCluster); !ok {
// only enable plugins when all pods of the StatefulSet become ready
// requeue request after 10 seconds without error
logger.Info("Not all replicas ready yet; requeuing request to enable plugins on RabbitmqCluster",
Expand Down Expand Up @@ -312,47 +314,58 @@ func (r *RabbitmqClusterReconciler) setAdminStatus(ctx context.Context, rmq *rab
return nil
}

// restartStatefulSetIfNeeded - helper function that annotates the StatefulSet PodTemplate with current timestamp
// to trigger a restart of the all pods in the StatefulSet when builder requires StatefulSet to be updated
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Context, builder resource.ResourceBuilder, operationResult controllerutil.OperationResult, rmq *rabbitmqv1beta1.RabbitmqCluster) {
if builder.UpdateRequiresStsRestart() && operationResult == controllerutil.OperationResultUpdated {
if err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}}
if err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, sts); err != nil {
return err
}
if sts.Spec.Template.ObjectMeta.Annotations == nil {
sts.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/restartAt"] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("Failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
// Adds an arbitrary annotation (rabbitmq.com/lastRestartAt) to the StatefulSet PodTemplate to trigger a StatefulSet restart
// if builder requires StatefulSet to be updated.
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
ctx context.Context,
builder resource.ResourceBuilder,
operationResult controllerutil.OperationResult,
rmq *rabbitmqv1beta1.RabbitmqCluster) (restarted bool) {

if !(builder.UpdateRequiresStsRestart() && operationResult == controllerutil.OperationResultUpdated) {
return false
}

if err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}}
if err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, sts); err != nil {
return err
}
msg := fmt.Sprintf("Restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
if sts.Spec.Template.ObjectMeta.Annotations == nil {
sts.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/lastRestartAt"] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("Failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return false
}

msg := fmt.Sprintf("Restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
return true
}

// allReplicasReady - helper function that checks if StatefulSet replicas are all ready
func (r *RabbitmqClusterReconciler) allReplicasReady(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
func (r *RabbitmqClusterReconciler) allReplicasReadyAndUpdated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
sts := &appsv1.StatefulSet{}

if err := r.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, sts); err != nil {
return false, client.IgnoreNotFound(err)
}

if sts.Status.ReadyReplicas < *sts.Spec.Replicas {
desiredReplicas := *sts.Spec.Replicas
if sts.Status.ReadyReplicas < desiredReplicas ||
sts.Status.UpdatedReplicas < desiredReplicas { // StatefulSet rolling update is still ongoing (see https://github.com/rabbitmq/cluster-operator/issues/304)
return false, nil
}

return true, nil
}

// enablePlugins - helper function to set the list of enabled plugins in a given RabbitmqCluster pods
// Helper function to set the list of enabled plugins in the given RabbitmqCluster pods.
// `rabbitmq-plugins set` disables plugins that are not in the provided list
func (r *RabbitmqClusterReconciler) enablePlugins(rmq *rabbitmqv1beta1.RabbitmqCluster) error {
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
Expand All @@ -363,11 +376,13 @@ func (r *RabbitmqClusterReconciler) enablePlugins(rmq *rabbitmqv1beta1.RabbitmqC
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", rabbitCommand)

if err != nil {

r.Log.Error(err, fmt.Sprintf(
"Failed to enable plugins on pod %s in namespace %s, running command %s with output: %s %s",
podName, rmq.Namespace, rabbitCommand, stdout, stderr))

r.Log.Error(err, "Failed to enable plugins",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", rabbitCommand,
"stdout", stdout,
"stderr", stderr)
return err
}
}
Expand Down Expand Up @@ -407,12 +422,9 @@ func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName strin
Stdin: nil,
Tty: false,
})

if err != nil {

return stdOut.String(), stdErr.String(), err
}

if stdErr.Len() > 0 {
return stdOut.String(), stdErr.String(), fmt.Errorf("%v", stdErr)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (builder *RabbitmqResourceBuilder) ServerConfigMap() *ServerConfigMapBuilde
}

func (builder *ServerConfigMapBuilder) UpdateRequiresStsRestart() bool {
return true
return true // because rabbitmq.conf and advanced.config changes take effect only after a node restart
}

func (builder *ServerConfigMapBuilder) Update(object runtime.Object) error {
Expand Down

0 comments on commit 4b231d4

Please sign in to comment.