Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FluxInstance artifact reconciler #127

Merged
merged 14 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ metadata:
namespace: flux-system
annotations:
fluxcd.controlplane.io/reconcileEvery: "1h"
fluxcd.controlplane.io/reconcileArtifactEvery: "10m"
fluxcd.controlplane.io/reconcileTimeout: "5m"
spec:
distribution:
Expand Down
36 changes: 27 additions & 9 deletions api/v1/fluxinstance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ const (
)

var (
Finalizer = fmt.Sprintf("%s/finalizer", GroupVersion.Group)
ReconcileAnnotation = fmt.Sprintf("%s/reconcile", GroupVersion.Group)
ReconcileEveryAnnotation = fmt.Sprintf("%s/reconcileEvery", GroupVersion.Group)
ReconcileTimeoutAnnotation = fmt.Sprintf("%s/reconcileTimeout", GroupVersion.Group)
PruneAnnotation = fmt.Sprintf("%s/prune", GroupVersion.Group)
RevisionAnnotation = fmt.Sprintf("%s/revision", GroupVersion.Group)
Finalizer = fmt.Sprintf("%s/finalizer", GroupVersion.Group)
ReconcileAnnotation = fmt.Sprintf("%s/reconcile", GroupVersion.Group)
ReconcileEveryAnnotation = fmt.Sprintf("%s/reconcileEvery", GroupVersion.Group)
ReconcileArtifactEveryAnnotation = fmt.Sprintf("%s/reconcileArtifactEvery", GroupVersion.Group)
ReconcileTimeoutAnnotation = fmt.Sprintf("%s/reconcileTimeout", GroupVersion.Group)
PruneAnnotation = fmt.Sprintf("%s/prune", GroupVersion.Group)
RevisionAnnotation = fmt.Sprintf("%s/revision", GroupVersion.Group)
)

// FluxInstanceSpec defines the desired state of FluxInstance
Expand Down Expand Up @@ -366,12 +367,29 @@ func (in *FluxInstance) IsDisabled() bool {
// GetInterval returns the interval at which the object should be reconciled.
// If no interval is set, the default is 60 minutes.
func (in *FluxInstance) GetInterval() time.Duration {
val, ok := in.GetAnnotations()[ReconcileAnnotation]
if ok && strings.ToLower(val) == DisabledValue {
if in.IsDisabled() {
return 0
}
defaultInterval := 60 * time.Minute
val, ok = in.GetAnnotations()[ReconcileEveryAnnotation]
val, ok := in.GetAnnotations()[ReconcileEveryAnnotation]
if !ok {
return defaultInterval
}
interval, err := time.ParseDuration(val)
if err != nil {
return defaultInterval
}
return interval
}

// GetArtifactInterval returns the interval at which the distribution artifact should be reconciled.
// If no interval is set, the default is 10 minutes.
func (in *FluxInstance) GetArtifactInterval() time.Duration {
if in.IsDisabled() {
return 0
}
defaultInterval := 10 * time.Minute
val, ok := in.GetAnnotations()[ReconcileArtifactEveryAnnotation]
if !ok {
return defaultInterval
}
Expand Down
12 changes: 12 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ func main() {
os.Exit(1)
}

if err = (&controller.FluxInstanceArtifactReconciler{
Client: mgr.GetClient(),
StatusManager: controllerName,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
}).SetupWithManager(mgr,
controller.FluxInstanceArtifactReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxInstanceKind+"Artifact")
os.Exit(1)
}

if err = (&controller.FluxReportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ metadata:
"namespace": "flux-system",
"annotations": {
"fluxcd.controlplane.io/reconcileEvery": "1h",
"fluxcd.controlplane.io/reconcileArtifactEvery": "10m",
"fluxcd.controlplane.io/reconcileTimeout": "5m"
}
},
Expand Down
2 changes: 2 additions & 0 deletions docs/api/v1/fluxinstance.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ metadata:
annotations:
fluxcd.controlplane.io/reconcile: "enabled"
fluxcd.controlplane.io/reconcileEvery: "1h"
fluxcd.controlplane.io/reconcileArtifactEvery: "10m"
fluxcd.controlplane.io/reconcileTimeout: "3m"
spec:
distribution:
Expand Down Expand Up @@ -432,6 +433,7 @@ The reconciliation behaviour can be configured using the following annotations:

- `fluxcd.controlplane.io/reconcile`: Enable or disable the reconciliation loop. Default is `enabled`, set to `disabled` to pause the reconciliation.
- `fluxcd.controlplane.io/reconcileEvery`: Set the reconciliation interval. Default is `1h`.
- `fluxcd.controlplane.io/reconcileArtifactEvery`: Set the artifact reconciliation interval. Default is `10m`.
- `fluxcd.controlplane.io/reconcileTimeout`: Set the reconciliation timeout. Default is `5m`.

### Sync configuration
Expand Down
21 changes: 21 additions & 0 deletions internal/builder/head.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package builder

import (
"context"
"fmt"
"strings"

"github.com/google/go-containerregistry/pkg/crane"
)

// HeadArtifact looks up an artifact from an OCI repository and returns the digest of the artifact.
func HeadArtifact(ctx context.Context, ociURL string) (string, error) {
desc, err := crane.Head(strings.TrimPrefix(ociURL, "oci://"), crane.WithContext(ctx))
if err != nil {
return "", fmt.Errorf("fetching descriptor for artifact %s failed: %w", ociURL, err)
}
return desc.Digest.String(), nil
}
101 changes: 101 additions & 0 deletions internal/controller/fluxinstance_artifact_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"fmt"
"time"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/patch"
corev1 "k8s.io/api/core/v1"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1"
"github.com/controlplaneio-fluxcd/flux-operator/internal/builder"
)

// FluxInstanceArtifactReconciler reconciles the distribution artifact of a FluxInstance object
type FluxInstanceArtifactReconciler struct {
client.Client
kuberecorder.EventRecorder

StatusManager string
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *FluxInstanceArtifactReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
obj := &fluxcdv1.FluxInstance{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Skip reconciliation if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

// Skip reconciliation if the object has the reconcile annotation set to 'disabled'.
if obj.IsDisabled() {
return ctrl.Result{}, nil
}

// Skip reconciliation if the object does not have a distribution artifact specified.
if obj.Spec.Distribution.Artifact == "" {
return ctrl.Result{}, nil
}

// Reconcile the object.
patcher := patch.NewSerialPatcher(obj, r.Client)
return r.reconcile(ctx, obj, patcher)
}

func (r *FluxInstanceArtifactReconciler) reconcile(ctx context.Context,
obj *fluxcdv1.FluxInstance,
patcher *patch.SerialPatcher) (ctrl.Result, error) {

log := ctrl.LoggerFrom(ctx)

// Fetch the latest digest of the distribution manifests.
artifactURL := obj.Spec.Distribution.Artifact
artifactDigest, err := builder.HeadArtifact(ctx, artifactURL)
if err != nil {
msg := fmt.Sprintf("fetch failed: %s", err.Error())
r.Event(obj, corev1.EventTypeWarning, meta.ArtifactFailedReason, msg)
return ctrl.Result{}, err
}
log.Info("fetched latest manifests", "url", artifactURL, "digest", artifactDigest)

// Skip reconciliation if the artifact has not changed.
if artifactDigest == obj.Status.LastArtifactRevision {
return requeueArtifactAfter(obj), nil
}

// The digest has changed, request a reconciliation.
log.Info("artifact revision changed, requesting a reconciliation",
"old", obj.Status.LastArtifactRevision, "new", artifactDigest)
if obj.Annotations == nil {
obj.Annotations = make(map[string]string, 1)
}
obj.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano)
if err := patcher.Patch(ctx, obj, patch.WithFieldOwner(r.StatusManager)); err != nil {
return ctrl.Result{}, err
}

return requeueArtifactAfter(obj), nil
}

// requeueArtifactAfter returns a ctrl.Result with the requeue time set to the
// interval specified in the object's annotation for artifact reconciliation.
func requeueArtifactAfter(obj *fluxcdv1.FluxInstance) ctrl.Result {
result := ctrl.Result{}
if d := obj.GetArtifactInterval(); d > 0 {
result.RequeueAfter = d
}
return result
}
163 changes: 163 additions & 0 deletions internal/controller/fluxinstance_artifact_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/fluxcd/pkg/apis/meta"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1"
"github.com/controlplaneio-fluxcd/flux-operator/internal/builder"
)

func TestFluxInstanceArtifactReconciler(t *testing.T) {
const cpLatestManifestsURL = "oci://ghcr.io/controlplaneio-fluxcd/flux-operator-manifests:latest"

g := NewWithT(t)

latestArtifactRevision, err := builder.HeadArtifact(context.Background(), cpLatestManifestsURL)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(latestArtifactRevision).To(HavePrefix("sha256:"))
g.Expect(strings.TrimPrefix(latestArtifactRevision, "sha256:")).To(HaveLen(64))

for _, tt := range []struct {
name string
delete bool
annotations map[string]string
manifestsURL string
lastArtifactRevision string
result ctrl.Result
err error
shouldRequestReconciliation bool
}{
{
name: "requests reconciliation when digest is different",
manifestsURL: cpLatestManifestsURL,
lastArtifactRevision: "",
result: ctrl.Result{RequeueAfter: 10 * time.Minute},
shouldRequestReconciliation: true,
},
{
name: "does not request reconciliation when up-to-date",
manifestsURL: cpLatestManifestsURL,
lastArtifactRevision: latestArtifactRevision,
result: ctrl.Result{RequeueAfter: 10 * time.Minute},
shouldRequestReconciliation: false,
},
{
name: "uses interval from annotation",
annotations: map[string]string{"fluxcd.controlplane.io/reconcileArtifactEvery": "2m"},
manifestsURL: cpLatestManifestsURL,
lastArtifactRevision: latestArtifactRevision,
result: ctrl.Result{RequeueAfter: 2 * time.Minute},
shouldRequestReconciliation: false,
},
{
name: "does not request reconciliation when on deletion",
delete: true,
manifestsURL: cpLatestManifestsURL,
lastArtifactRevision: "",
result: ctrl.Result{},
shouldRequestReconciliation: false,
},
{
name: "does not request reconciliation when disabled",
annotations: map[string]string{"fluxcd.controlplane.io/reconcile": "disabled"},
manifestsURL: cpLatestManifestsURL,
lastArtifactRevision: "",
result: ctrl.Result{},
shouldRequestReconciliation: false,
},
{
name: "does not request reconciliation when artifact is not specified",
manifestsURL: "",
result: ctrl.Result{},
shouldRequestReconciliation: false,
},
{
name: "does not request reconciliation on artifact error",
manifestsURL: "oci://not.found/artifact",
lastArtifactRevision: "",
result: ctrl.Result{},
err: errors.New("no such host"),
shouldRequestReconciliation: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
reconciler := getFluxInstanceArtifactReconciler()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ns, err := testEnv.CreateNamespace(ctx, "test")
g.Expect(err).ToNot(HaveOccurred())

obj := &fluxcdv1.FluxInstance{
ObjectMeta: metav1.ObjectMeta{
Name: ns.Name,
Namespace: ns.Name,
Annotations: tt.annotations,
},
Spec: getDefaultFluxSpec(),
}
obj.Spec.Distribution.Artifact = tt.manifestsURL

err = testEnv.Create(ctx, obj)
g.Expect(err).ToNot(HaveOccurred())

if tt.lastArtifactRevision != "" {
obj.Status.LastArtifactRevision = tt.lastArtifactRevision
err := testEnv.Status().Update(ctx, obj)
g.Expect(err).ToNot(HaveOccurred())
}

if tt.delete {
obj.Finalizers = append(obj.Finalizers, "test")
err := testEnv.Update(ctx, obj)
g.Expect(err).ToNot(HaveOccurred())
err = testEnv.Delete(ctx, obj)
g.Expect(err).ToNot(HaveOccurred())
}

r, err := reconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(obj),
})
if tt.err != nil {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tt.err.Error()))
} else {
g.Expect(err).ToNot(HaveOccurred())
}
g.Expect(r).To(Equal(tt.result))

err = testEnv.Get(ctx, client.ObjectKeyFromObject(obj), obj)
g.Expect(err).ToNot(HaveOccurred())

annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
reconcileRequestAnnotation := annotations[meta.ReconcileRequestAnnotation]

if tt.shouldRequestReconciliation {
g.Expect(reconcileRequestAnnotation).ToNot(BeEmpty())
requestedAt, err := time.Parse(time.RFC3339Nano, reconcileRequestAnnotation)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(requestedAt).To(BeTemporally("~", time.Now(), time.Second))
} else {
g.Expect(reconcileRequestAnnotation).To(BeEmpty())
}
})
}
}
Loading
Loading