Skip to content

Commit

Permalink
Fix event filters (#41), lastTransitionTime not updated in conditions…
Browse files Browse the repository at this point in the history
…, and path finalize (#42)

* fix(path): added error if no bucket found

* fix: various fixes

- rollback event filter changes
- reimpl. of Set*StatusConditionAndUpdate functions (w/ new utils.go)
- unify loggers in reconcile loop
  • Loading branch information
phlg authored Mar 6, 2024
1 parent 831b09b commit 9ef45a3
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 193 deletions.
93 changes: 28 additions & 65 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package controllers
import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -35,6 +35,7 @@ import (

s3v1alpha1 "github.com/InseeFrLab/s3-operator/api/v1alpha1"
"github.com/InseeFrLab/s3-operator/controllers/s3/factory"
"github.com/InseeFrLab/s3-operator/controllers/utils"
)

// BucketReconciler reconciles a Bucket object
Expand All @@ -57,8 +58,7 @@ const bucketFinalizer = "s3.onyxia.sh/finalizer"
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
errorLogger := log.FromContext(ctx)
logger := ctrl.Log.WithName("bucketReconcile")
logger := log.FromContext(ctx)

// Checking for bucket resource existence
bucketResource := &s3v1alpha1.Bucket{}
Expand All @@ -68,7 +68,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
logger.Info("The Bucket custom resource has been removed ; as such the Bucket controller is NOOP.", "req.Name", req.Name)
return ctrl.Result{}, nil
}
errorLogger.Error(err, "An error occurred when attempting to read the Bucket resource from the Kubernetes cluster")
logger.Error(err, "An error occurred when attempting to read the Bucket resource from the Kubernetes cluster")
return ctrl.Result{}, err
}

Expand All @@ -82,7 +82,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// that we can retry during the next reconciliation.
if err := r.finalizeBucket(bucketResource); err != nil {
// return ctrl.Result{}, err
errorLogger.Error(err, "an error occurred when attempting to finalize the bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred when attempting to finalize the bucket", "bucket", bucketResource.Spec.Name)
// return ctrl.Result{}, err
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketFinalizeFailed",
fmt.Sprintf("An error occurred when attempting to delete bucket [%s]", bucketResource.Spec.Name), err)
Expand All @@ -93,7 +93,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
controllerutil.RemoveFinalizer(bucketResource, bucketFinalizer)
err := r.Update(ctx, bucketResource)
if err != nil {
errorLogger.Error(err, "an error occurred when removing finalizer from bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred when removing finalizer from bucket", "bucket", bucketResource.Spec.Name)
// return ctrl.Result{}, err
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketFinalizerRemovalFailed",
fmt.Sprintf("An error occurred when attempting to remove the finalizer from bucket [%s]", bucketResource.Spec.Name), err)
Expand All @@ -107,7 +107,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
controllerutil.AddFinalizer(bucketResource, bucketFinalizer)
err = r.Update(ctx, bucketResource)
if err != nil {
errorLogger.Error(err, "an error occurred when adding finalizer from bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred when adding finalizer from bucket", "bucket", bucketResource.Spec.Name)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketFinalizerAddFailed",
fmt.Sprintf("An error occurred when attempting to add the finalizer from bucket [%s]", bucketResource.Spec.Name), err)
}
Expand All @@ -118,7 +118,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// Check bucket existence on the S3 server
found, err := r.S3Client.BucketExists(bucketResource.Spec.Name)
if err != nil {
errorLogger.Error(err, "an error occurred while checking the existence of a bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred while checking the existence of a bucket", "bucket", bucketResource.Spec.Name)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketExistenceCheckFailed",
fmt.Sprintf("Checking existence of bucket [%s] from S3 instance has failed", bucketResource.Spec.Name), err)
}
Expand All @@ -129,15 +129,15 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// Bucket creation
err = r.S3Client.CreateBucket(bucketResource.Spec.Name)
if err != nil {
errorLogger.Error(err, "an error occurred while creating a bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred while creating a bucket", "bucket", bucketResource.Spec.Name)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketCreationFailed",
fmt.Sprintf("Creation of bucket [%s] on S3 instance has failed", bucketResource.Spec.Name), err)
}

// Setting quotas
err = r.S3Client.SetQuota(bucketResource.Spec.Name, bucketResource.Spec.Quota.Default)
if err != nil {
errorLogger.Error(err, "an error occurred while setting a quota on a bucket", "bucket", bucketResource.Spec.Name, "quota", bucketResource.Spec.Quota.Default)
logger.Error(err, "an error occurred while setting a quota on a bucket", "bucket", bucketResource.Spec.Name, "quota", bucketResource.Spec.Quota.Default)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "SetQuotaOnBucketFailed",
fmt.Sprintf("Setting a quota of [%v] on bucket [%s] has failed", bucketResource.Spec.Quota.Default, bucketResource.Spec.Name), err)
}
Expand All @@ -146,7 +146,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
for _, v := range bucketResource.Spec.Paths {
err = r.S3Client.CreatePath(bucketResource.Spec.Name, v)
if err != nil {
errorLogger.Error(err, "an error occurred while creating a path on a bucket", "bucket", bucketResource.Spec.Name, "path", v)
logger.Error(err, "an error occurred while creating a path on a bucket", "bucket", bucketResource.Spec.Name, "path", v)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "CreatingPathOnBucketFailed",
fmt.Sprintf("Creating the path [%s] on bucket [%s] has failed", v, bucketResource.Spec.Name), err)
}
Expand All @@ -163,7 +163,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// Checking effectiveQuota existence on the bucket
effectiveQuota, err := r.S3Client.GetQuota(bucketResource.Spec.Name)
if err != nil {
errorLogger.Error(err, "an error occurred while getting the quota for a bucket", "bucket", bucketResource.Spec.Name)
logger.Error(err, "an error occurred while getting the quota for a bucket", "bucket", bucketResource.Spec.Name)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketQuotaCheckFailed",
fmt.Sprintf("The check for a quota on bucket [%s] has failed", bucketResource.Spec.Name), err)
}
Expand All @@ -180,7 +180,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if effectiveQuota != quotaToResetTo {
err = r.S3Client.SetQuota(bucketResource.Spec.Name, quotaToResetTo)
if err != nil {
errorLogger.Error(err, "an error occurred while resetting the quota for a bucket", "bucket", bucketResource.Spec.Name, "quotaToResetTo", quotaToResetTo)
logger.Error(err, "an error occurred while resetting the quota for a bucket", "bucket", bucketResource.Spec.Name, "quotaToResetTo", quotaToResetTo)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketQuotaUpdateFailed",
fmt.Sprintf("The quota update (%v => %v) on bucket [%s] has failed", effectiveQuota, quotaToResetTo, bucketResource.Spec.Name), err)
}
Expand All @@ -196,15 +196,15 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
for _, pathInCr := range bucketResource.Spec.Paths {
pathExists, err := r.S3Client.PathExists(bucketResource.Spec.Name, pathInCr)
if err != nil {
errorLogger.Error(err, "an error occurred while checking a path's existence on a bucket", "bucket", bucketResource.Spec.Name, "path", pathInCr)
logger.Error(err, "an error occurred while checking a path's existence on a bucket", "bucket", bucketResource.Spec.Name, "path", pathInCr)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketPathCheckFailed",
fmt.Sprintf("The check for path [%s] on bucket [%s] has failed", pathInCr, bucketResource.Spec.Name), err)
}

if !pathExists {
err = r.S3Client.CreatePath(bucketResource.Spec.Name, pathInCr)
if err != nil {
errorLogger.Error(err, "an error occurred while creating a path on a bucket", "bucket", bucketResource.Spec.Name, "path", pathInCr)
logger.Error(err, "an error occurred while creating a path on a bucket", "bucket", bucketResource.Spec.Name, "path", pathInCr)
return r.SetBucketStatusConditionAndUpdate(ctx, bucketResource, "OperatorFailed", metav1.ConditionFalse, "BucketPathCreationFailed",
fmt.Sprintf("The creation of path [%s] on bucket [%s] has failed", pathInCr, bucketResource.Spec.Name), err)
}
Expand All @@ -219,44 +219,16 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

// SetupWithManager sets up the controller with the Manager.*
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
logger := ctrl.Log.WithName("bucketEventFilter")
return ctrl.NewControllerManagedBy(mgr).
For(&s3v1alpha1.Bucket{}).
// REF : https://sdk.operatorframework.io/docs/building-operators/golang/references/event-filtering/
WithEventFilter(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
// Only reconcile if :
// - Generation has changed
// or
// - Of all Conditions matching the last generation, none is in status "True"
// There is an implicit assumption that in such a case, the resource was once failing, but then transitioned
// to a functional state. We use this ersatz because lastTransitionTime appears to not work properly - see also
// comment in SetBucketStatusConditionAndUpdate() below.
newBucket, _ := e.ObjectNew.(*s3v1alpha1.Bucket)

// 1 - Identifying the most recent generation
var maxGeneration int64 = 0
for _, condition := range newBucket.Status.Conditions {
if condition.ObservedGeneration > maxGeneration {
maxGeneration = condition.ObservedGeneration
}
}
// 2 - Checking one of the conditions in most recent generation is True
conditionTrueInLastGeneration := false
for _, condition := range newBucket.Status.Conditions {
if condition.ObservedGeneration == maxGeneration && condition.Status == metav1.ConditionTrue {
conditionTrueInLastGeneration = true
}
}
predicate := e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration() || !conditionTrueInLastGeneration
if !predicate {
logger.Info("reconcile update event is filtered out", "resource", e.ObjectNew.GetName())
}
return predicate
// Only reconcile if generation has changed
return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Evaluates to false if the object has been confirmed deleted.
logger.Info("reconcile delete event is filtered out", "resource", e.Object.GetName())
return !e.DeleteStateUnknown
},
}).
Expand All @@ -274,26 +246,17 @@ func (r *BucketReconciler) finalizeBucket(bucketResource *s3v1alpha1.Bucket) err
func (r *BucketReconciler) SetBucketStatusConditionAndUpdate(ctx context.Context, bucketResource *s3v1alpha1.Bucket, conditionType string, status metav1.ConditionStatus, reason string, message string, srcError error) (ctrl.Result, error) {
logger := log.FromContext(ctx)

// It would seem LastTransitionTime does not work as intended (our understanding of the intent coming from this :
// https://pkg.go.dev/k8s.io/[email protected]/pkg/api/meta#SetStatusCondition). Whether we set the
// date manually or leave it out to have default behavior, the lastTransitionTime is NOT updated if the CR
// had that condition at least once in the past.
// For instance, with the following updates to a CR :
// - gen 1 : condition type = A
// - gen 2 : condition type = B
// - gen 3 : condition type = A again
// Then the condition with type A in CR Status will still have the lastTransitionTime dating back to gen 1.
// Because of this, lastTransitionTime cannot be reliably used to determine current state, which in turn had
// us turn to a less than ideal event filter (see above in SetupWithManager())
meta.SetStatusCondition(&bucketResource.Status.Conditions,
metav1.Condition{
Type: conditionType,
Status: status,
Reason: reason,
// LastTransitionTime: metav1.NewTime(time.Now()),
Message: message,
ObservedGeneration: bucketResource.GetGeneration(),
})
// We moved away from meta.SetStatusCondition, as the implementation did not allow for updating
// lastTransitionTime if a Condition (as identified by Reason instead of Type) was previously
// obtained and updated to again.
bucketResource.Status.Conditions = utils.UpdateConditions(bucketResource.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: status,
Reason: reason,
LastTransitionTime: metav1.NewTime(time.Now()),
Message: message,
ObservedGeneration: bucketResource.GetGeneration(),
})

err := r.Status().Update(ctx, bucketResource)
if err != nil {
Expand Down
Loading

0 comments on commit 9ef45a3

Please sign in to comment.