diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index b01236828..c8ad62435 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{ }, } +// bucketFailConditions contains the conditions that represent a failure. +var bucketFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete @@ -307,10 +313,12 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the bucketReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -355,9 +363,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, index, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + "revision": newObj.Status.Artifact.Revision, + "checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -574,10 +615,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1. conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - r.annotatedEventLogf(ctx, obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 2f432a4bb..8df10bad2 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -1163,3 +1164,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) { }) } } + +func TestBucketReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.Bucket) + newObjBeforeFunc func(obj *sourcev1.Bucket) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored artifact with 2 fetched files from", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: "test-bucket", + }, + } + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &BucketReconciler{ + EventRecorder: recorder, + } + index := &etagIndex{ + index: map[string]string{ + "zzz": "qqq", + "bbb": "ddd", + }, + } + reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index 2aa0f8589..7252975e9 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{ }, } +// gitRepositoryFailConditions contains the conditions that represent a failure. +var gitRepositoryFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.IncludeUnavailableCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete @@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + // Mark as reconciling if generation differs if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) @@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, commit, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + "revision": newObj.Status.Artifact.Revision, + "checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage()) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage()) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 88fceb7e7..37b2f65c9 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -1634,3 +1635,109 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) { }) } } + +func TestGitRepositoryReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.GitRepository) + newObjBeforeFunc func(obj *sourcev1.GitRepository) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal NewArtifact stored artifact for commit", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored artifact for commit", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored artifact for commit", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.GitRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &GitRepositoryReconciler{ + EventRecorder: recorder, + } + commit := &git.Commit{ + Message: "test commit", + } + reconciler.notify(oldObj, newObj, *commit, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 894eb99b6..863865bb0 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -99,6 +99,13 @@ var helmChartReadyCondition = summarize.Conditions{ }, } +// helmChartFailConditions contains the conditions that represent a failure. +var helmChartFailConditions = []string{ + sourcev1.BuildFailedCondition, + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/finalizers,verbs=get;create;update;patch;delete @@ -239,10 +246,12 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the helmChartReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -269,9 +278,40 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, &build, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *HelmChartReconciler) notify(oldObj, newObj *sourcev1.HelmChart, build *chart.Build, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + "revision": newObj.Status.Artifact.Revision, + "checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + reasonForBuild(build), build.Summary()) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + reasonForBuild(build), build.Summary()) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -711,12 +751,6 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, obj *source obj.Status.Artifact = artifact.DeepCopy() obj.Status.ObservedChartName = b.Name - // Publish an event - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, reasonForBuild(b), b.Summary()) - // Update symlink on a "best effort" basis symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { diff --git a/controllers/helmchart_controller_test.go b/controllers/helmchart_controller_test.go index 522908c32..b61c8b578 100644 --- a/controllers/helmchart_controller_test.go +++ b/controllers/helmchart_controller_test.go @@ -1577,3 +1577,112 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) { }) } } + +func TestHelmChartReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.HelmChart) + newObjBeforeFunc func(obj *sourcev1.HelmChart) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.HelmChart{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &HelmChartReconciler{ + EventRecorder: recorder, + } + build := &chart.Build{ + Name: "foo", + Version: "1.0.0", + Path: "some/path", + Packaged: true, + } + reconciler.notify(oldObj, newObj, build, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index cbad94102..343efd5c3 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/url" - "os" "time" "github.com/docker/go-units" @@ -82,6 +81,13 @@ var helmRepositoryReadyCondition = summarize.Conditions{ }, } +// helmRepositoryFailConditions contains the conditions that represent a +// failure. +var helmRepositoryFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete @@ -195,10 +201,12 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the helmRepositoryReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -225,9 +233,44 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1. // Prioritize requeue request in the result for successful results. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, chartRepo, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *HelmRepositoryReconciler) notify(oldObj, newObj *sourcev1.HelmRepository, chartRepo repository.ChartRepository, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + "revision": newObj.Status.Artifact.Revision, + "checksum": newObj.Status.Artifact.Checksum, + } + + size := units.HumanSize(float64(*newObj.Status.Artifact.Size)) + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored index of size %s from '%s'", size, chartRepo.URL) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -445,23 +488,6 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s return sreconcile.ResultEmpty, e } - // Calculate the artifact size to be included in the NewArtifact event. - fi, err := os.Stat(chartRepo.CachePath) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("unable to read the artifact: %w", err), - Reason: sourcev1.ReadOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - size := units.HumanSize(float64(fi.Size())) - - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched index of size %s from '%s'", size, chartRepo.URL) - // Record it on the object. obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index 95b770915..49707519f 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "os" @@ -837,3 +838,110 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) { }) } } + +func TestHelmRepositoryReconciler_notify(t *testing.T) { + var aSize int64 = 30000 + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.HelmRepository) + newObjBeforeFunc func(obj *sourcev1.HelmRepository) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + }, + wantEvent: "Normal NewArtifact stored index of size", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored index of size", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored index of size", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.HelmRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &HelmRepositoryReconciler{ + EventRecorder: recorder, + } + chartRepo := repository.ChartRepository{ + URL: "some-address", + } + reconciler.notify(oldObj, newObj, chartRepo, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/internal/reconcile/reconcile.go b/internal/reconcile/reconcile.go index af0c71b97..7369a3932 100644 --- a/internal/reconcile/reconcile.go +++ b/internal/reconcile/reconcile.go @@ -158,3 +158,22 @@ func LowestRequeuingResult(i, j Result) Result { return j } } + +// FailureRecovery finds out if a failure recovery occurred by checking the fail +// conditions in the old object and the new object. +func FailureRecovery(oldObj, newObj conditions.Setter, failConditions []string) bool { + failuresBefore, failuresNow := 0, 0 + for _, failCondition := range failConditions { + if conditions.Get(oldObj, failCondition) != nil { + failuresBefore++ + } + if conditions.Get(newObj, failCondition) != nil { + failuresNow++ + } + // Short-circuit, there are failures now, no recovery. + if failuresNow > 0 { + break + } + } + return failuresBefore > 0 && failuresNow == 0 +} diff --git a/internal/reconcile/reconcile_test.go b/internal/reconcile/reconcile_test.go index 127e3c186..26922f26d 100644 --- a/internal/reconcile/reconcile_test.go +++ b/internal/reconcile/reconcile_test.go @@ -202,3 +202,99 @@ func TestComputeReconcileResult(t *testing.T) { }) } } + +func TestFailureRecovery(t *testing.T) { + failCondns := []string{ + "FooFailed", + "BarFailed", + "BazFailed", + } + tests := []struct { + name string + oldObjFunc func(obj conditions.Setter) + newObjFunc func(obj conditions.Setter) + failConditions []string + result bool + }{ + { + name: "no failures", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "no recovery", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "different failure", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "BarFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "failure recovery", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: true, + }, + { + name: "ready to fail", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "BazFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + oldObj := &sourcev1.GitRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjFunc != nil { + tt.oldObjFunc(oldObj) + } + + if tt.newObjFunc != nil { + tt.newObjFunc(newObj) + } + + g.Expect(FailureRecovery(oldObj, newObj, tt.failConditions)).To(Equal(tt.result)) + }) + } +}