Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FluxInstance artifact reconciler #127

Merged
merged 14 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ metadata:
namespace: flux-system
annotations:
fluxcd.controlplane.io/reconcileEvery: "1h"
fluxcd.controlplane.io/reconcileArtifactEvery: "10m"
fluxcd.controlplane.io/reconcileTimeout: "5m"
spec:
distribution:
Expand Down
36 changes: 27 additions & 9 deletions api/v1/fluxinstance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ const (
)

var (
Finalizer = fmt.Sprintf("%s/finalizer", GroupVersion.Group)
ReconcileAnnotation = fmt.Sprintf("%s/reconcile", GroupVersion.Group)
ReconcileEveryAnnotation = fmt.Sprintf("%s/reconcileEvery", GroupVersion.Group)
ReconcileTimeoutAnnotation = fmt.Sprintf("%s/reconcileTimeout", GroupVersion.Group)
PruneAnnotation = fmt.Sprintf("%s/prune", GroupVersion.Group)
RevisionAnnotation = fmt.Sprintf("%s/revision", GroupVersion.Group)
Finalizer = fmt.Sprintf("%s/finalizer", GroupVersion.Group)
ReconcileAnnotation = fmt.Sprintf("%s/reconcile", GroupVersion.Group)
ReconcileEveryAnnotation = fmt.Sprintf("%s/reconcileEvery", GroupVersion.Group)
ReconcileArtifactEveryAnnotation = fmt.Sprintf("%s/reconcileArtifactEvery", GroupVersion.Group)
ReconcileTimeoutAnnotation = fmt.Sprintf("%s/reconcileTimeout", GroupVersion.Group)
PruneAnnotation = fmt.Sprintf("%s/prune", GroupVersion.Group)
RevisionAnnotation = fmt.Sprintf("%s/revision", GroupVersion.Group)
)

// FluxInstanceSpec defines the desired state of FluxInstance
Expand Down Expand Up @@ -366,12 +367,29 @@ func (in *FluxInstance) IsDisabled() bool {
// GetInterval returns the interval at which the object should be reconciled.
// If no interval is set, the default is 60 minutes.
func (in *FluxInstance) GetInterval() time.Duration {
val, ok := in.GetAnnotations()[ReconcileAnnotation]
if ok && strings.ToLower(val) == DisabledValue {
if in.IsDisabled() {
return 0
}
defaultInterval := 60 * time.Minute
val, ok = in.GetAnnotations()[ReconcileEveryAnnotation]
val, ok := in.GetAnnotations()[ReconcileEveryAnnotation]
if !ok {
return defaultInterval
}
interval, err := time.ParseDuration(val)
if err != nil {
return defaultInterval
}
return interval
}

// GetArtifactInterval returns the interval at which the distribution artifact should be reconciled.
// If no interval is set, the default is 10 minutes.
func (in *FluxInstance) GetArtifactInterval() time.Duration {
if in.IsDisabled() {
return 0
}
defaultInterval := 10 * time.Minute
val, ok := in.GetAnnotations()[ReconcileArtifactEveryAnnotation]
if !ok {
return defaultInterval
}
Expand Down
12 changes: 12 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ func main() {
os.Exit(1)
}

if err = (&controller.FluxInstanceArtifactReconciler{
Client: mgr.GetClient(),
StatusManager: controllerName,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
}).SetupWithManager(mgr,
controller.FluxInstanceArtifactReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", fluxcdv1.FluxInstanceKind+"Artifact")
os.Exit(1)
}

if err = (&controller.FluxReportReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
2 changes: 2 additions & 0 deletions docs/api/v1/fluxinstance.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ metadata:
annotations:
fluxcd.controlplane.io/reconcile: "enabled"
fluxcd.controlplane.io/reconcileEvery: "1h"
fluxcd.controlplane.io/reconcileArtifactEvery: "10m"
fluxcd.controlplane.io/reconcileTimeout: "3m"
spec:
distribution:
Expand Down Expand Up @@ -432,6 +433,7 @@ The reconciliation behaviour can be configured using the following annotations:

- `fluxcd.controlplane.io/reconcile`: Enable or disable the reconciliation loop. Default is `enabled`, set to `disabled` to pause the reconciliation.
- `fluxcd.controlplane.io/reconcileEvery`: Set the reconciliation interval. Default is `1h`.
- `fluxcd.controlplane.io/reconcileArtifactEvery`: Set the artifact reconciliation interval. Default is `10m`.
- `fluxcd.controlplane.io/reconcileTimeout`: Set the reconciliation timeout. Default is `5m`.

### Sync configuration
Expand Down
21 changes: 21 additions & 0 deletions internal/builder/head.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package builder

import (
"context"
"fmt"
"strings"

"github.com/google/go-containerregistry/pkg/crane"
)

// HeadArtifact looks up an artifact from an OCI repository and returns the digest of the artifact.
func HeadArtifact(ctx context.Context, ociURL string) (string, error) {
desc, err := crane.Head(strings.TrimPrefix(ociURL, "oci://"), crane.WithContext(ctx))
if err != nil {
return "", fmt.Errorf("fetching descriptor for artifact %s failed: %w", ociURL, err)
}
return desc.Digest.String(), nil
}
107 changes: 107 additions & 0 deletions internal/controller/fluxinstance_artifact_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2024 Stefan Prodan.
// SPDX-License-Identifier: AGPL-3.0

package controller

import (
"context"
"fmt"
"time"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
corev1 "k8s.io/api/core/v1"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

fluxcdv1 "github.com/controlplaneio-fluxcd/flux-operator/api/v1"
"github.com/controlplaneio-fluxcd/flux-operator/internal/builder"
)

// FluxInstanceArtifactReconciler reconciles the distribution artifact of a FluxInstance object
type FluxInstanceArtifactReconciler struct {
client.Client
kuberecorder.EventRecorder

StatusManager string
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *FluxInstanceArtifactReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
obj := &fluxcdv1.FluxInstance{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// Skip reconciliation if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

// Skip reconciliation if the object has the reconcile annotation set to 'disabled'.
if obj.IsDisabled() {
return ctrl.Result{}, nil
}

// Skip reconciliation if the object does not have a last artifact revision to avoid race condition.
if obj.Status.LastArtifactRevision == "" {
return requeueArtifactAfter(obj), nil
}

// Skip reconciliation if the object is not ready.
if !conditions.IsReady(obj) {
return requeueArtifactAfter(obj), nil
}

// Reconcile the object.
patcher := patch.NewSerialPatcher(obj, r.Client)
return r.reconcile(ctx, obj, patcher)
}

func (r *FluxInstanceArtifactReconciler) reconcile(ctx context.Context,
obj *fluxcdv1.FluxInstance,
patcher *patch.SerialPatcher) (ctrl.Result, error) {

log := ctrl.LoggerFrom(ctx)

// Fetch the latest digest of the distribution manifests.
artifactURL := obj.Spec.Distribution.Artifact
artifactDigest, err := builder.HeadArtifact(ctx, artifactURL)
if err != nil {
msg := fmt.Sprintf("fetch failed: %s", err.Error())
r.Event(obj, corev1.EventTypeWarning, meta.ArtifactFailedReason, msg)
return ctrl.Result{}, err
}
log.V(1).Info("fetched latest manifests", "url", artifactURL, "digest", artifactDigest)

// Skip reconciliation if the artifact has not changed.
if artifactDigest == obj.Status.LastArtifactRevision {
return requeueArtifactAfter(obj), nil
}

// The digest has changed, request a reconciliation.
log.Info("artifact revision changed, requesting a reconciliation",
"old", obj.Status.LastArtifactRevision, "new", artifactDigest)
if obj.Annotations == nil {
obj.Annotations = make(map[string]string, 1)
}
obj.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano)
if err := patcher.Patch(ctx, obj, patch.WithFieldOwner(r.StatusManager)); err != nil {
return ctrl.Result{}, err
}

return requeueArtifactAfter(obj), nil
}

// requeueArtifactAfter returns a ctrl.Result with the requeue time set to the
// interval specified in the object's annotation for artifact reconciliation.
func requeueArtifactAfter(obj *fluxcdv1.FluxInstance) ctrl.Result {
result := ctrl.Result{}
if d := obj.GetArtifactInterval(); d > 0 {
result.RequeueAfter = d
}
return result
}
Loading
Loading