diff --git a/controllers/ocirepository_controller.go b/controllers/ocirepository_controller.go index da4916f64..736a860e0 100644 --- a/controllers/ocirepository_controller.go +++ b/controllers/ocirepository_controller.go @@ -21,8 +21,10 @@ import ( "errors" "fmt" "os" + "sort" "time" + "github.com/Masterminds/semver/v3" "github.com/google/go-containerregistry/pkg/crane" gcrv1 "github.com/google/go-containerregistry/pkg/v1" corev1 "k8s.io/api/core/v1" @@ -45,6 +47,7 @@ import ( "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/pkg/untar" + "github.com/fluxcd/pkg/version" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" serror "github.com/fluxcd/source-controller/internal/error" sreconcile "github.com/fluxcd/source-controller/internal/reconcile" @@ -271,55 +274,21 @@ func (r *OCIRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.O return res, resErr } -// notify emits notification related to the reconciliation. -func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) { - // Notify successful reconciliation for new artifact and recovery from any - // failure. - if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { - annotations := map[string]string{ - sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, - } - - var oldChecksum string - if oldObj.GetArtifact() != nil { - oldChecksum = oldObj.GetArtifact().Checksum - } - - message := fmt.Sprintf("stored artifact with digest '%s' from '%s'", digest.String(), newObj.Spec.URL) - - // Notify on new artifact and failure recovery. - if oldChecksum != newObj.GetArtifact().Checksum { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - "NewArtifact", message) - ctrl.LoggerFrom(ctx).Info(message) - } else { - if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) { - r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, - meta.SucceededReason, message) - ctrl.LoggerFrom(ctx).Info(message) - } - } - } -} - -// reconcileSource fetches the upstream OCI artifact content. +// reconcileSource fetches the upstream OCI artifact metadata and content. // If this fails, it records v1beta2.FetchFailedCondition=True on the object and returns early. func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) { ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) defer cancel() - url := obj.Spec.URL - if obj.Spec.Reference != nil { - if obj.Spec.Reference.Tag != "" { - url = fmt.Sprintf("%s:%s", obj.Spec.URL, obj.Spec.Reference.Tag) - } - if obj.Spec.Reference.Digest != "" { - url = fmt.Sprintf("%s@%s", obj.Spec.URL, obj.Spec.Reference.Digest) - } + // Determine which artifact revision to pull + url, err := r.getArtifactURL(ctxTimeout, obj) + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e } - // Pull OCI artifact + // Pull artifact from the remote container registry img, err := crane.Pull(url, r.craneOptions(ctxTimeout)...) if err != nil { e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} @@ -327,7 +296,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour return sreconcile.ResultEmpty, e } - // Fetch digest + // Determine the artifact SHA256 digest imgDigest, err := img.Digest() if err != nil { e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} @@ -335,14 +304,14 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour return sreconcile.ResultEmpty, e } - // Set revision from digest hex + // Set the internal revision to the remote digest hex imgDigest.DeepCopyInto(digest) revision := imgDigest.Hex // Mark observations about the revision on the object defer func() { if !obj.GetArtifact().HasRevision(revision) { - message := fmt.Sprintf("new upstream revision '%s'", revision) + message := fmt.Sprintf("new upstream revision '%s' for '%s'", revision, url) conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) conditions.MarkReconciling(obj, "NewRevision", message) } @@ -382,6 +351,76 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour return sreconcile.ResultSuccess, nil } +// getArtifactURL determines which tag or digest should be used and returns the OCI artifact FQN. +func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourcev1.OCIRepository) (string, error) { + url := obj.Spec.URL + if obj.Spec.Reference != nil { + if obj.Spec.Reference.Digest != "" { + return fmt.Sprintf("%s@%s", obj.Spec.URL, obj.Spec.Reference.Digest), nil + } + + if obj.Spec.Reference.SemVer != "" { + tag, err := r.getTagBySemver(ctx, url, obj.Spec.Reference.SemVer) + if err != nil { + return "", err + } + return fmt.Sprintf("%s:%s", obj.Spec.URL, tag), nil + } + + if obj.Spec.Reference.Tag != "latest" { + return fmt.Sprintf("%s:%s", obj.Spec.URL, obj.Spec.Reference.Tag), nil + } + } + + return url, nil +} + +// getTagBySemver call the remote container registry, fetches all the tags from the repository, +// and returns the latest tag according to the semver expression. +func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context, url, exp string) (string, error) { + tags, err := crane.ListTags(url, r.craneOptions(ctx)...) + if err != nil { + return "", err + } + + constraint, err := semver.NewConstraint(exp) + if err != nil { + return "", fmt.Errorf("semver '%s' parse error: %w", exp, err) + } + + var matchingVersions []*semver.Version + for _, t := range tags { + v, err := version.ParseVersion(t) + if err != nil { + continue + } + + if constraint.Check(v) { + matchingVersions = append(matchingVersions, v) + } + } + + if len(matchingVersions) == 0 { + return "", fmt.Errorf("no match found for semver: %s", exp) + } + + sort.Sort(sort.Reverse(semver.Collection(matchingVersions))) + return matchingVersions[0].Original(), nil +} + +// craneOptions sets the timeout and user agent for all operations against remote container registries. +func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context) []crane.Option { + return []crane.Option{ + crane.WithContext(ctx), + crane.WithUserAgent("flux/v2"), + crane.WithPlatform(&gcrv1.Platform{ + Architecture: "flux", + OS: "flux", + OSVersion: "v2", + }), + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -580,14 +619,34 @@ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Obj r.Eventf(obj, eventType, reason, msg) } -func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context) []crane.Option { - return []crane.Option{ - crane.WithContext(ctx), - crane.WithUserAgent("flux/v2"), - crane.WithPlatform(&gcrv1.Platform{ - Architecture: "flux", - OS: "flux", - OSVersion: "v2", - }), +// notify emits notification related to the reconciliation. +func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact with digest '%s' from '%s'", digest.String(), newObj.Spec.URL) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + ctrl.LoggerFrom(ctx).Info(message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + ctrl.LoggerFrom(ctx).Info(message) + } + } } } diff --git a/controllers/ocirepository_controller_test.go b/controllers/ocirepository_controller_test.go index 561ef42ed..044d8666f 100644 --- a/controllers/ocirepository_controller_test.go +++ b/controllers/ocirepository_controller_test.go @@ -17,19 +17,25 @@ import ( ) func TestOCIRepository_Reconcile(t *testing.T) { - tests := []struct { name string url string tag string + semver string digest string }{ { - name: "public latest", + name: "public tag", url: "ghcr.io/stefanprodan/manifests/podinfo", tag: "6.1.6", digest: "3b6cdcc7adcc9a84d3214ee1c029543789d90b5ae69debe9efa3f66e982875de", }, + { + name: "public semver", + url: "ghcr.io/stefanprodan/manifests/podinfo", + semver: ">= 6.1 <= 6.1.5", + digest: "1d1bf6980fc86f69481bd8c875c531aa23d761ac890ce2594d4df2b39ecd8713", + }, } for _, tt := range tests { @@ -46,14 +52,19 @@ func TestOCIRepository_Reconcile(t *testing.T) { Namespace: ns.Name, }, Spec: sourcev1.OCIRepositorySpec{ - URL: tt.url, - Interval: metav1.Duration{Duration: 60 * time.Minute}, - Reference: &sourcev1.OCIRepositoryRef{ - Tag: tt.tag, - }, + URL: tt.url, + Interval: metav1.Duration{Duration: 60 * time.Minute}, + Reference: &sourcev1.OCIRepositoryRef{}, }, } + if tt.tag != "" { + obj.Spec.Reference.Tag = tt.tag + } + if tt.semver != "" { + obj.Spec.Reference.SemVer = tt.semver + } + g.Expect(testEnv.Create(ctx, obj)).To(Succeed()) key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace} @@ -79,7 +90,9 @@ func TestOCIRepository_Reconcile(t *testing.T) { obj.Generation == obj.Status.ObservedGeneration }, timeout).Should(BeTrue()) - // Check if the revision is set to the digest format + t.Log(obj.Spec.Reference) + + // Check if the revision matches the expected digest g.Expect(obj.Status.Artifact.Revision).To(Equal(tt.digest)) // Check if the object status is valid