Skip to content

Commit

Permalink
Implement OCIRepository ref.semver
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Prodan <[email protected]>
  • Loading branch information
stefanprodan committed Jun 22, 2022
1 parent 37a7bc5 commit 77c8894
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 62 deletions.
167 changes: 113 additions & 54 deletions controllers/ocirepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"errors"
"fmt"
"os"
"sort"
"time"

"github.com/Masterminds/semver/v3"
"github.com/google/go-containerregistry/pkg/crane"
gcrv1 "github.com/google/go-containerregistry/pkg/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -45,6 +47,7 @@ import (
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/pkg/untar"
"github.com/fluxcd/pkg/version"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
Expand Down Expand Up @@ -271,78 +274,44 @@ func (r *OCIRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.O
return res, resErr
}

// notify emits notification related to the reconciliation.
func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}

var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}

message := fmt.Sprintf("stored artifact with digest '%s' from '%s'", digest.String(), newObj.Spec.URL)

// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
ctrl.LoggerFrom(ctx).Info(message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
ctrl.LoggerFrom(ctx).Info(message)
}
}
}
}

// reconcileSource fetches the upstream OCI artifact content.
// reconcileSource fetches the upstream OCI artifact metadata and content.
// If this fails, it records v1beta2.FetchFailedCondition=True on the object and returns early.
func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()

url := obj.Spec.URL
if obj.Spec.Reference != nil {
if obj.Spec.Reference.Tag != "" {
url = fmt.Sprintf("%s:%s", obj.Spec.URL, obj.Spec.Reference.Tag)
}
if obj.Spec.Reference.Digest != "" {
url = fmt.Sprintf("%s@%s", obj.Spec.URL, obj.Spec.Reference.Digest)
}
// Determine which artifact revision to pull
url, err := r.getArtifactURL(ctxTimeout, obj)
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}

// Pull OCI artifact
// Pull artifact from the remote container registry
img, err := crane.Pull(url, r.craneOptions(ctxTimeout)...)
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}

// Fetch digest
// Determine the artifact SHA256 digest
imgDigest, err := img.Digest()
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error())
return sreconcile.ResultEmpty, e
}

// Set revision from digest hex
// Set the internal revision to the remote digest hex
imgDigest.DeepCopyInto(digest)
revision := imgDigest.Hex

// Mark observations about the revision on the object
defer func() {
if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new upstream revision '%s'", revision)
message := fmt.Sprintf("new upstream revision '%s' for '%s'", revision, url)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
}
Expand Down Expand Up @@ -382,6 +351,76 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sour
return sreconcile.ResultSuccess, nil
}

// getArtifactURL determines which tag or digest should be used and returns the OCI artifact FQN.
func (r *OCIRepositoryReconciler) getArtifactURL(ctx context.Context, obj *sourcev1.OCIRepository) (string, error) {
url := obj.Spec.URL
if obj.Spec.Reference != nil {
if obj.Spec.Reference.Digest != "" {
return fmt.Sprintf("%s@%s", obj.Spec.URL, obj.Spec.Reference.Digest), nil
}

if obj.Spec.Reference.SemVer != "" {
tag, err := r.getTagBySemver(ctx, url, obj.Spec.Reference.SemVer)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", obj.Spec.URL, tag), nil
}

if obj.Spec.Reference.Tag != "latest" {
return fmt.Sprintf("%s:%s", obj.Spec.URL, obj.Spec.Reference.Tag), nil
}
}

return url, nil
}

// getTagBySemver call the remote container registry, fetches all the tags from the repository,
// and returns the latest tag according to the semver expression.
func (r *OCIRepositoryReconciler) getTagBySemver(ctx context.Context, url, exp string) (string, error) {
tags, err := crane.ListTags(url, r.craneOptions(ctx)...)
if err != nil {
return "", err
}

constraint, err := semver.NewConstraint(exp)
if err != nil {
return "", fmt.Errorf("semver '%s' parse error: %w", exp, err)
}

var matchingVersions []*semver.Version
for _, t := range tags {
v, err := version.ParseVersion(t)
if err != nil {
continue
}

if constraint.Check(v) {
matchingVersions = append(matchingVersions, v)
}
}

if len(matchingVersions) == 0 {
return "", fmt.Errorf("no match found for semver: %s", exp)
}

sort.Sort(sort.Reverse(semver.Collection(matchingVersions)))
return matchingVersions[0].Original(), nil
}

// craneOptions sets the timeout and user agent for all operations against remote container registries.
func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context) []crane.Option {
return []crane.Option{
crane.WithContext(ctx),
crane.WithUserAgent("flux/v2"),
crane.WithPlatform(&gcrv1.Platform{
Architecture: "flux",
OS: "flux",
OSVersion: "v2",
}),
}
}

// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
Expand Down Expand Up @@ -580,14 +619,34 @@ func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Obj
r.Eventf(obj, eventType, reason, msg)
}

func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context) []crane.Option {
return []crane.Option{
crane.WithContext(ctx),
crane.WithUserAgent("flux/v2"),
crane.WithPlatform(&gcrv1.Platform{
Architecture: "flux",
OS: "flux",
OSVersion: "v2",
}),
// notify emits notification related to the reconciliation.
func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}

var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}

message := fmt.Sprintf("stored artifact with digest '%s' from '%s'", digest.String(), newObj.Spec.URL)

// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
ctrl.LoggerFrom(ctx).Info(message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
ctrl.LoggerFrom(ctx).Info(message)
}
}
}
}
29 changes: 21 additions & 8 deletions controllers/ocirepository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@ import (
)

func TestOCIRepository_Reconcile(t *testing.T) {

tests := []struct {
name string
url string
tag string
semver string
digest string
}{
{
name: "public latest",
name: "public tag",
url: "ghcr.io/stefanprodan/manifests/podinfo",
tag: "6.1.6",
digest: "3b6cdcc7adcc9a84d3214ee1c029543789d90b5ae69debe9efa3f66e982875de",
},
{
name: "public semver",
url: "ghcr.io/stefanprodan/manifests/podinfo",
semver: ">= 6.1 <= 6.1.5",
digest: "1d1bf6980fc86f69481bd8c875c531aa23d761ac890ce2594d4df2b39ecd8713",
},
}

for _, tt := range tests {
Expand All @@ -46,14 +52,19 @@ func TestOCIRepository_Reconcile(t *testing.T) {
Namespace: ns.Name,
},
Spec: sourcev1.OCIRepositorySpec{
URL: tt.url,
Interval: metav1.Duration{Duration: 60 * time.Minute},
Reference: &sourcev1.OCIRepositoryRef{
Tag: tt.tag,
},
URL: tt.url,
Interval: metav1.Duration{Duration: 60 * time.Minute},
Reference: &sourcev1.OCIRepositoryRef{},
},
}

if tt.tag != "" {
obj.Spec.Reference.Tag = tt.tag
}
if tt.semver != "" {
obj.Spec.Reference.SemVer = tt.semver
}

g.Expect(testEnv.Create(ctx, obj)).To(Succeed())

key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
Expand All @@ -79,7 +90,9 @@ func TestOCIRepository_Reconcile(t *testing.T) {
obj.Generation == obj.Status.ObservedGeneration
}, timeout).Should(BeTrue())

// Check if the revision is set to the digest format
t.Log(obj.Spec.Reference)

// Check if the revision matches the expected digest
g.Expect(obj.Status.Artifact.Revision).To(Equal(tt.digest))

// Check if the object status is valid
Expand Down

0 comments on commit 77c8894

Please sign in to comment.