Skip to content

Commit

Permalink
Use logger from context in Reconcile() (#550)
Browse files Browse the repository at this point in the history
* Use logger from context in Reconcile()

- with controller-runtime 0.7, Reconcile() takes a
context with a logger. The logger is pre populated with
controller name, gvk, object name and namespace.

* Use LoggerFrom alias

Also, remove logger from method signatures since it's already part of
context.

* Remove redundant namespace and name from logs

Co-authored-by: David Ansari <[email protected]>
  • Loading branch information
ChunyiLyu and ansd authored Jan 8, 2021
1 parent c92a7dc commit 9b239ea
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 71 deletions.
49 changes: 18 additions & 31 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"strings"
"time"

"github.com/go-logr/logr"

"github.com/rabbitmq/cluster-operator/internal/resource"
"github.com/rabbitmq/cluster-operator/internal/status"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,7 +34,6 @@ import (

"k8s.io/apimachinery/pkg/runtime"

"github.com/go-logr/logr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -56,7 +57,6 @@ const (
// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
type RabbitmqClusterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Namespace string
Recorder record.EventRecorder
Expand All @@ -82,7 +82,7 @@ type RabbitmqClusterReconciler struct {
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=rolebindings,verbs=get;list;watch;create;update

func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log
logger := ctrl.LoggerFrom(ctx)

rabbitmqCluster, err := r.getRabbitmqCluster(ctx, req.NamespacedName)

Expand All @@ -95,25 +95,19 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// Check if the resource has been marked for deletion
if !rabbitmqCluster.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Deleting RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Info("Deleting")
return ctrl.Result{}, r.prepareForDeletion(ctx, rabbitmqCluster)
}

// exit if pause reconciliation label is set to true
if v, ok := rabbitmqCluster.Labels[pauseReconciliationLabel]; ok && v == "true" {
logger.Info("Not reconciling RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Info("Not reconciling RabbitmqCluster")
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning,
"PausedReconciliation", fmt.Sprintf("label '%s' is set to true", pauseReconciliationLabel))

rabbitmqCluster.Status.SetCondition(status.NoWarnings, corev1.ConditionFalse, "reconciliation paused")
if writerErr := r.Status().Update(ctx, rabbitmqCluster); writerErr != nil {
r.Log.Error(writerErr, "Error trying to Update NoWarnings condition state",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Error(writerErr, "Error trying to Update NoWarnings condition state")
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -147,9 +141,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
logger.Error(err, "Failed to marshal cluster spec")
}

logger.Info("Start reconciling RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name,
logger.Info("Start reconciling",
"spec", string(instanceSpec))

resourceBuilder := resource.RabbitmqResourceBuilder{
Expand All @@ -176,33 +168,29 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
})
return apiError
})
r.logAndRecordOperationResult(rabbitmqCluster, resource, operationResult, err)
r.logAndRecordOperationResult(logger, rabbitmqCluster, resource, operationResult, err)
if err != nil {
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "Error", err.Error())
if writerErr := r.Status().Update(ctx, rabbitmqCluster); writerErr != nil {
r.Log.Error(writerErr, "Error trying to Update ReconcileSuccess condition state",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Error(writerErr, "Failed to update ReconcileSuccess condition state")
}
return ctrl.Result{}, err
}

if err = r.annotateIfNeeded(ctx, builder, operationResult, rabbitmqCluster); err != nil {
if err = r.annotateIfNeeded(ctx, logger, builder, operationResult, rabbitmqCluster); err != nil {
return ctrl.Result{}, err
}
}

if requeueAfter, err := r.restartStatefulSetIfNeeded(ctx, rabbitmqCluster); err != nil || requeueAfter > 0 {
if requeueAfter, err := r.restartStatefulSetIfNeeded(ctx, logger, rabbitmqCluster); err != nil || requeueAfter > 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// Set ReconcileSuccess to true here because all CRUD operations to Kube API related
// to child resources returned no error
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionTrue, "Success", "Created or Updated all child resources")
if writerErr := r.Status().Update(ctx, rabbitmqCluster); writerErr != nil {
r.Log.Error(writerErr, "Error trying to Update Custom Resource status",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Error(writerErr, "Failed to Update Custom Resource status")
}

if err := r.setDefaultUserStatus(ctx, rabbitmqCluster); err != nil {
Expand All @@ -215,16 +203,14 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

logger.Info("Finished reconciling RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
logger.Info("Finished reconciling")

return ctrl.Result{}, nil
}

// logAndRecordOperationResult - helper function to log and record events with message and error
// it logs and records 'updated' and 'created' OperationResult, and ignores OperationResult 'unchanged'
func (r *RabbitmqClusterReconciler) logAndRecordOperationResult(rmq runtime.Object, resource runtime.Object, operationResult controllerutil.OperationResult, err error) {
func (r *RabbitmqClusterReconciler) logAndRecordOperationResult(logger logr.Logger, rmq runtime.Object, resource runtime.Object, operationResult controllerutil.OperationResult, err error) {
if operationResult == controllerutil.OperationResultNone && err == nil {
return
}
Expand All @@ -239,18 +225,19 @@ func (r *RabbitmqClusterReconciler) logAndRecordOperationResult(rmq runtime.Obje

if err == nil {
msg := fmt.Sprintf("%sd resource %s of Type %T", operation, resource.(metav1.Object).GetName(), resource.(metav1.Object))
r.Log.Info(msg)
logger.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, fmt.Sprintf("Successful%s", strings.Title(operation)), msg)
}

if err != nil {
msg := fmt.Sprintf("failed to %s resource %s of Type %T", operation, resource.(metav1.Object).GetName(), resource.(metav1.Object))
r.Log.Error(err, msg)
logger.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, fmt.Sprintf("Failed%s", strings.Title(operation)), msg)
}
}

func (r *RabbitmqClusterReconciler) updateStatus(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (time.Duration, error) {
logger := ctrl.LoggerFrom(ctx)
childResources, err := r.getChildResources(ctx, rmq)
if err != nil {
return 0, err
Expand All @@ -263,7 +250,7 @@ func (r *RabbitmqClusterReconciler) updateStatus(ctx context.Context, rmq *rabbi
if !reflect.DeepEqual(rmq.Status.Conditions, oldConditions) {
if err = r.Status().Update(ctx, rmq); err != nil {
if errors.IsConflict(err) {
r.Log.Info("failed to update status because of conflict; requeueing...",
logger.Info("failed to update status because of conflict; requeueing...",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 2 * time.Second, nil
Expand Down
32 changes: 11 additions & 21 deletions controllers/reconcile_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import (
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"

func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
logger := ctrl.LoggerFrom(ctx)
sts, err := r.statefulSet(ctx, rmq)
if err != nil {
return 0, err
}
if !allReplicasReadyAndUpdated(sts) {
r.Log.Info("not all replicas ready yet; requeuing request to run RabbitMQ CLI commands",
"namespace", rmq.Namespace,
"name", rmq.Name)
logger.Info("not all replicas ready yet; requeuing request to run RabbitMQ CLI commands")
return 15 * time.Second, nil
}

Expand All @@ -39,9 +39,7 @@ func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx contex
// plugins configMap was updated very recently
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
r.Log.Info("requeuing request to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
logger.Info("requeuing request to set plugins")
return 2 * time.Second, nil
}

Expand Down Expand Up @@ -69,22 +67,19 @@ func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx contex
}

func (r *RabbitmqClusterReconciler) runEnableFeatureFlagsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
logger := ctrl.LoggerFrom(ctx)
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
cmd := "set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to enable all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name,
logger.Error(err, "failed to enable all feature flags",
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
r.Log.Info("successfully enabled all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name)
logger.Info("successfully enabled all feature flags")
return r.deleteAnnotation(ctx, sts, stsCreateAnnotation)
}

Expand All @@ -93,25 +88,22 @@ func (r *RabbitmqClusterReconciler) runEnableFeatureFlagsCommand(ctx context.Con
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
// This method implements the 2nd path.
func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, configMap *corev1.ConfigMap) error {
logger := ctrl.LoggerFrom(ctx)
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
cmd := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name,
logger.Error(err, "failed to set plugins",
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
}
r.Log.Info("successfully set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
logger.Info("successfully set plugins")
return r.deleteAnnotation(ctx, configMap, pluginsUpdateAnnotation)
}

Expand All @@ -120,9 +112,7 @@ func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context
cmd := "rabbitmq-queues rebalance all"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to run queue rebalance",
"namespace", rmq.Namespace,
"name", rmq.Name,
ctrl.LoggerFrom(ctx).Error(err, "failed to run queue rebalance",
"pod", podName,
"command", cmd,
"stdout", stdout,
Expand Down
6 changes: 4 additions & 2 deletions controllers/reconcile_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package controllers
import (
"context"
"fmt"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientretry "k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
Expand Down Expand Up @@ -58,11 +60,11 @@ func (r *RabbitmqClusterReconciler) prepareForDeletion(ctx context.Context, rabb

return nil
}); err != nil {
r.Log.Error(err, "RabbitmqCluster deletion")
ctrl.LoggerFrom(ctx).Error(err, "RabbitmqCluster deletion")
}

if err := r.removeFinalizer(ctx, rabbitmqCluster); err != nil {
r.Log.Error(err, "Failed to remove finalizer for deletion")
ctrl.LoggerFrom(ctx).Error(err, "Failed to remove finalizer for deletion")
return err
}
}
Expand Down
18 changes: 10 additions & 8 deletions controllers/reconcile_rabbitmq_configurations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -25,7 +27,7 @@ const (

// Annotates an object depending on object type and operationResult.
// These annotations are temporary markers used in later reconcile loops to perform some action (such as restarting the StatefulSet or executing RabbitMQ CLI commands)
func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, builder resource.ResourceBuilder, operationResult controllerutil.OperationResult, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, logger logr.Logger, builder resource.ResourceBuilder, operationResult controllerutil.OperationResult, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
var (
obj client.Object
objName string
Expand Down Expand Up @@ -64,19 +66,19 @@ func (r *RabbitmqClusterReconciler) annotateIfNeeded(ctx context.Context, builde

if err := r.updateAnnotation(ctx, obj, rmq.Namespace, objName, annotationKey, time.Now().Format(time.RFC3339)); err != nil {
msg := "failed to annotate " + objName
r.Log.Error(err, msg, "namespace", rmq.Namespace)
logger.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return err
}

r.Log.Info("successfully annotated", "namespace", rmq.Namespace, "name", objName)
logger.Info("successfully annotated")
return nil
}

// Adds an arbitrary annotation to the sts PodTemplate to trigger a sts restart.
// It compares annotation "rabbitmq.com/serverConfUpdatedAt" from server-conf configMap and annotation "rabbitmq.com/lastRestartAt" from sts
// to determine whether to restart sts.
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (time.Duration, error) {
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Context, logger logr.Logger, rmq *rabbitmqv1beta1.RabbitmqCluster) (time.Duration, error) {
serverConf, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.ServerConfigMapName))
if err != nil {
// requeue request after 10s if unable to find server-conf configmap, else return the error
Expand Down Expand Up @@ -112,15 +114,15 @@ func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Conte
sts.Spec.Template.ObjectMeta.Annotations[stsRestartAnnotation] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
msg := fmt.Sprintf("failed to restart StatefulSet %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"))
logger.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
// failed to restart sts; return error to requeue request
return 0, err
}

msg := fmt.Sprintf("restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
msg := fmt.Sprintf("restarted StatefulSet %s", rmq.ChildResourceName("server"))
logger.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)

return 0, nil
Expand Down
Loading

0 comments on commit 9b239ea

Please sign in to comment.