Skip to content

Commit

Permalink
Implement events and metrics helpers from runtime
Browse files Browse the repository at this point in the history
This commit loosely implements the events and metrics helpers that have
been newly introduced to `fluxcd/pkg/runtime`, and heavily reduces code
duplication. It is in preparation of a much bigger overhaul to
implement the work pending in fluxcd/pkg#101.

While implementing, I ran into little annoyances that likely should be
addressed before the "official" `runtime` MINOR release:

- Passing `nil` every time there isn't any metadata for an event quickly
  becomes cumbersome; we should look into an `EventWithMetadata` and/or
  `EventfWithMetadata`, or some other way to _optionally_ provide
  metadata without annoying the consumer.
- There is an inconsistency in the method names of the metric helper,
  i.e. `RecordReadinessMetric` vs `RecordSuspend`. We either need to
  append or remove the `Metric` suffix on all recording methods.

Signed-off-by: Hidde Beydals <[email protected]>
  • Loading branch information
hiddeco committed Jun 14, 2021
1 parent 3af8962 commit 26ebbe5
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 399 deletions.
103 changes: 17 additions & 86 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,21 @@ import (
"strings"
"time"

"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/fluxcd/pkg/apis/meta"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
Expand All @@ -60,11 +55,10 @@ import (
// BucketReconciler reconciles a Bucket object
type BucketReconciler struct {
client.Client
Scheme *runtime.Scheme
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
helper.Events
helper.Metrics

Storage *Storage
}

type BucketReconcilerOptions struct {
Expand Down Expand Up @@ -93,7 +87,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// Record suspended status metric
defer r.recordSuspension(ctx, bucket)
defer r.Metrics.RecordSuspend(ctx, &bucket, bucket.Spec.Suspend)

// Add our finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) {
Expand All @@ -116,13 +110,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
defer r.Metrics.RecordDuration(ctx, &bucket, start)

// set initial status
if resetBucket, ok := r.resetStatus(bucket); ok {
Expand All @@ -131,7 +119,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, bucket)
r.Metrics.RecordReadinessMetric(ctx, &bucket)
}

// record the value of the reconciliation request, if any
Expand All @@ -157,16 +145,18 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(ctx, reconciledBucket)
r.Events.Event(ctx, &reconciledBucket, nil, events.EventSeverityError, "ReconciliationFailed", reconcileErr.Error())
r.Metrics.RecordReadinessMetric(ctx, &bucket)
return ctrl.Result{Requeue: true}, reconcileErr
}

// emit revision change event
if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
r.Events.Event(ctx, &reconciledBucket, map[string]string{
"revision": reconciledBucket.GetArtifact().Revision,
}, events.EventSeverityInfo, "NewRevision", sourcev1.BucketReadyMessage(reconciledBucket))
}
r.recordReadiness(ctx, reconciledBucket)
r.Metrics.RecordReadinessMetric(ctx, &bucket)

log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
Expand Down Expand Up @@ -297,14 +287,14 @@ func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket

func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
if err := r.gc(bucket); err != nil {
r.event(ctx, bucket, events.EventSeverityError,
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.Events.Eventf(ctx, &bucket, nil, events.EventSeverityError, "GarbageCollectionFailed",
"garbage collection for deleted resource failed: %s", err.Error())
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

// Record deleted status
r.recordReadiness(ctx, bucket)
r.Metrics.RecordReadinessMetric(ctx, &bucket)

// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
Expand Down Expand Up @@ -408,65 +398,6 @@ func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
return nil
}

// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) {
log := logr.FromContext(ctx)
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&bucket, "Normal", severity, msg)
}
if r.ExternalEventRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
log.Error(err, "unable to send event")
return
}

if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
log.Error(err, "unable to send event")
return
}
}
}

func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) {
log := logr.FromContext(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !bucket.DeletionTimestamp.IsZero())
}
}

func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) {
if r.MetricsRecorder == nil {
return
}
log := logr.FromContext(ctx)

objRef, err := reference.GetReference(r.Scheme, &bucket)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}

if !bucket.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend)
}
}

func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error {
var bucket sourcev1.Bucket
if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
Expand Down
112 changes: 22 additions & 90 deletions controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,8 +38,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/fluxcd/pkg/apis/meta"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
Expand All @@ -60,12 +56,12 @@ import (
// GitRepositoryReconciler reconciles a GitRepository object
type GitRepositoryReconciler struct {
client.Client
requeueDependency time.Duration
Scheme *runtime.Scheme
Storage *Storage
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
helper.Events
helper.Metrics

Storage *Storage

requeueDependency time.Duration
}

type GitRepositoryReconcilerOptions struct {
Expand Down Expand Up @@ -98,7 +94,7 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Record suspended status metric
defer r.recordSuspension(ctx, repository)
defer r.RecordSuspend(ctx, &repository, repository.Spec.Suspend)

// Add our finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&repository, sourcev1.SourceFinalizer) {
Expand Down Expand Up @@ -132,21 +128,15 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// instead we requeue on a fix interval.
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
log.Info(msg)
r.event(ctx, repository, events.EventSeverityInfo, msg)
r.recordReadiness(ctx, repository)
r.Events.Event(ctx, &repository, nil, events.EventSeverityInfo, "DependencyNotReady", msg)
r.Metrics.RecordReadinessMetric(ctx, &repository)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
log.Info("All dependencies area ready, proceeding with reconciliation")
}

// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
defer r.Metrics.RecordDuration(ctx, &repository, start)

// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
Expand All @@ -155,7 +145,7 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, repository)
r.Metrics.RecordReadinessMetric(ctx, &repository)
}

// record the value of the reconciliation request, if any
Expand All @@ -181,16 +171,18 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(ctx, reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(ctx, reconciledRepository)
r.Events.Event(ctx, &reconciledRepository, nil, events.EventSeverityError, "ReconciliationFailed", reconcileErr.Error())
r.Metrics.RecordReadinessMetric(ctx, &reconciledRepository)
return ctrl.Result{Requeue: true}, reconcileErr
}

// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(ctx, reconciledRepository, events.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
if repository.GetArtifact() == nil || reconciledRepository.GetArtifact().Revision != repository.GetArtifact().Revision {
r.Events.Event(ctx, &reconciledRepository, map[string]string{
"revision": reconciledRepository.GetArtifact().Revision,
}, events.EventSeverityInfo, "NewRevision", sourcev1.GitRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(ctx, reconciledRepository)
r.Metrics.RecordReadinessMetric(ctx, &reconciledRepository)

log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
Expand Down Expand Up @@ -376,14 +368,14 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sour

func (r *GitRepositoryReconciler) reconcileDelete(ctx context.Context, repository sourcev1.GitRepository) (ctrl.Result, error) {
if err := r.gc(repository); err != nil {
r.event(ctx, repository, events.EventSeverityError,
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
r.Events.Eventf(ctx, &repository, nil, events.EventSeverityError, "GarbageCollectionFailed",
"garbage collection for deleted resource failed: %s", err.Error())
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}

// Record deleted status
r.recordReadiness(ctx, repository)
r.Metrics.RecordReadinessMetric(ctx, &repository)

// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&repository, sourcev1.SourceFinalizer)
Expand Down Expand Up @@ -424,66 +416,6 @@ func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) error {
return nil
}

// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *GitRepositoryReconciler) event(ctx context.Context, repository sourcev1.GitRepository, severity, msg string) {
log := logr.FromContext(ctx)

if r.EventRecorder != nil {
r.EventRecorder.Eventf(&repository, "Normal", severity, msg)
}
if r.ExternalEventRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
log.Error(err, "unable to send event")
return
}

if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
log.Error(err, "unable to send event")
return
}
}
}

func (r *GitRepositoryReconciler) recordReadiness(ctx context.Context, repository sourcev1.GitRepository) {
log := logr.FromContext(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !repository.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !repository.DeletionTimestamp.IsZero())
}
}

func (r *GitRepositoryReconciler) recordSuspension(ctx context.Context, gitrepository sourcev1.GitRepository) {
if r.MetricsRecorder == nil {
return
}
log := logr.FromContext(ctx)

objRef, err := reference.GetReference(r.Scheme, &gitrepository)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}

if !gitrepository.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, gitrepository.Spec.Suspend)
}
}

func (r *GitRepositoryReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.GitRepositoryStatus) error {
var repository sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
Expand Down
Loading

0 comments on commit 26ebbe5

Please sign in to comment.