Skip to content

Commit

Permalink
Update precedence for schedule
Browse files Browse the repository at this point in the history
This patch modifies the parsing logic of schedule
found in annotation so that the precedence is in
form: SC > NS > PVC

This applies for both keyrotation and reclaimspace

The schedule present on the PVC annotations will always
be equal to that of the highest precedence. Modifying it
manually will lead to it being overwritten.

Signed-off-by: Niraj Yadav <[email protected]>
  • Loading branch information
black-dragon74 committed Oct 7, 2024
1 parent b0ae317 commit e27bea1
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 129 deletions.
8 changes: 5 additions & 3 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func main() {
flag.StringVar(&cfg.Namespace, "namespace", cfg.Namespace, "Namespace where the CSIAddons pod is deployed")
flag.BoolVar(&enableAdmissionWebhooks, "enable-admission-webhooks", false, "[DEPRECATED] Enable the admission webhooks")
flag.BoolVar(&showVersion, "version", false, "Print Version details")
flag.StringVar(&cfg.SchedulePrecedence, "schedule-precedence", "pvc-first", "The order of precedence in which schedule of reclaimspace and keyrotation is considered. Possible values are pvc-first and sc-first")
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
Expand Down Expand Up @@ -180,9 +181,10 @@ func main() {
os.Exit(1)
}
if err = (&controllers.PersistentVolumeClaimReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConnPool: connPool,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConnPool: connPool,
SchedulePrecedence: cfg.SchedulePrecedence,
}).SetupWithManager(mgr, ctrlOptions); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PersistentVolumeClaim")
os.Exit(1)
Expand Down
232 changes: 170 additions & 62 deletions internal/controller/csiaddons/persistentvolumeclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand All @@ -27,6 +28,7 @@ import (

csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
"github.com/csi-addons/kubernetes-csi-addons/internal/connection"
"github.com/csi-addons/kubernetes-csi-addons/internal/util"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
Expand All @@ -52,7 +54,8 @@ type PersistentVolumeClaimReconciler struct {
client.Client
Scheme *runtime.Scheme
// ConnectionPool consists of map of Connection objects.
ConnPool *connection.ConnectionPool
ConnPool *connection.ConnectionPool
SchedulePrecedence string
}

// Operation defines the sub operation to be performed
Expand Down Expand Up @@ -210,33 +213,49 @@ func (r *PersistentVolumeClaimReconciler) checkDriverSupportCapability(
return false, false
}

// determineScheduleAndRequeue determines the schedule using the following steps
// - Check if the schedule is present in the PVC annotations. If yes, use that.
// - Check if the schedule is present in the namespace annotations. If yes,
// use that.
// - Check if the schedule is present in the storageclass annotations. If yes,
// use that.
func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue(
func (r *PersistentVolumeClaimReconciler) getScheduleFromSC(
ctx context.Context,
logger *logr.Logger,
pvc *corev1.PersistentVolumeClaim,
driverName string,
annotationKey string,
) (string, error) {
annotations := pvc.GetAnnotations()
schedule, scheduleFound := getScheduleFromAnnotation(annotationKey, logger, annotations)
if scheduleFound {
return schedule, nil
logger *logr.Logger,
annotationKey string) string {
// For static provisioned PVs, StorageClassName is empty.
// Read SC schedule only when not statically provisioned.
if pvc.Spec.StorageClassName != nil && len(*pvc.Spec.StorageClassName) != 0 {
storageClassName := *pvc.Spec.StorageClassName
sc := &storagev1.StorageClass{}
err := r.Client.Get(ctx, types.NamespacedName{Name: storageClassName}, sc)
if err != nil {
if apierrors.IsNotFound(err) {
logger.Error(err, "StorageClass not found", "StorageClass", storageClassName)
return ""
}

logger.Error(err, "Failed to get StorageClass", "StorageClass", storageClassName)
return ""
}
schedule, scheduleFound := getScheduleFromAnnotation(annotationKey, logger, sc.GetAnnotations())
if scheduleFound {
return schedule
}
}

return ""
}

func (r *PersistentVolumeClaimReconciler) getScheduleFromNS(
ctx context.Context,
pvc *corev1.PersistentVolumeClaim,
logger *logr.Logger,
driverName string,
annotationKey string) (string, error) {
// check for namespace schedule annotation.
ns := &corev1.Namespace{}
err := r.Client.Get(ctx, types.NamespacedName{Name: pvc.Namespace}, ns)
if err != nil {
logger.Error(err, "Failed to get Namespace", "Namespace", pvc.Namespace)
return "", err
}
schedule, scheduleFound = getScheduleFromAnnotation(annotationKey, logger, ns.Annotations)
schedule, scheduleFound := getScheduleFromAnnotation(annotationKey, logger, ns.GetAnnotations())

// If the schedule is found, check whether driver supports the
// space reclamation using annotation on namespace and registered driver
Expand Down Expand Up @@ -277,30 +296,76 @@ func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue(
}
}

// For static provisioned PVs, StorageClassName is nil or empty.
if pvc.Spec.StorageClassName == nil || len(*pvc.Spec.StorageClassName) == 0 {
logger.Info("StorageClassName is empty")
return "", ErrScheduleNotFound
return "", ErrScheduleNotFound
}

func (r *PersistentVolumeClaimReconciler) getScheduleFromPVC(
pvc *corev1.PersistentVolumeClaim,
logger *logr.Logger,
annotationKey string) string {
// Check for PVC annotation.
schedule, scheduleFound := getScheduleFromAnnotation(annotationKey, logger, pvc.GetAnnotations())
if scheduleFound {
return schedule
}
storageClassName := *pvc.Spec.StorageClassName

// check for storageclass schedule annotation.
sc := &storagev1.StorageClass{}
err = r.Client.Get(ctx, types.NamespacedName{Name: storageClassName}, sc)
if err != nil {
if apierrors.IsNotFound(err) {
logger.Error(err, "StorageClass not found", "StorageClass", storageClassName)
return "", ErrScheduleNotFound
return ""
}

// determineScheduleAndRequeue determines the schedule from annotations.
func (r *PersistentVolumeClaimReconciler) determineScheduleAndRequeue(
ctx context.Context,
logger *logr.Logger,
pvc *corev1.PersistentVolumeClaim,
driverName string,
annotationKey string,
) (string, error) {
var schedule string
var err error
switch r.SchedulePrecedence {
case util.SchedulePVCFirst:
// Check on PVC
if schedule = r.getScheduleFromPVC(pvc, logger, annotationKey); schedule != "" {
return schedule, nil
}

logger.Error(err, "Failed to get StorageClass", "StorageClass", storageClassName)
return "", err
}
schedule, scheduleFound = getScheduleFromAnnotation(annotationKey, logger, sc.Annotations)
if scheduleFound {
return schedule, nil
// Check on NS, might get ErrConnNotFoundRequeueNeeded
// If so, return the error
if schedule, err = r.getScheduleFromNS(ctx, pvc, logger, driverName, annotationKey); schedule != "" {
return schedule, nil
}
if errors.Is(err, ErrConnNotFoundRequeueNeeded) {
return "", err
}

// Check SC
if schedule = r.getScheduleFromSC(ctx, pvc, logger, annotationKey); schedule != "" {
return schedule, nil
}
case util.ScheduleSCFirst:
// Check on SC
if schedule = r.getScheduleFromSC(ctx, pvc, logger, annotationKey); schedule != "" {
return schedule, nil
}

// TODO: Dicuss on whether we should parse NS/PVCs in upstream when using sc-first precedence

Check failure on line 351 in internal/controller/csiaddons/persistentvolumeclaim_controller.go

View workflow job for this annotation

GitHub Actions / codespell

Dicuss ==> Discuss

// Check on NS, might get ErrConnNotFoundRequeueNeeded
// If so, return the error
if schedule, err = r.getScheduleFromNS(ctx, pvc, logger, driverName, annotationKey); schedule != "" {
return schedule, nil
}
if errors.Is(err, ErrConnNotFoundRequeueNeeded) {
return "", err
}

// Check on PVC
if schedule = r.getScheduleFromPVC(pvc, logger, annotationKey); schedule != "" {
return schedule, nil
}
}

// If nothing matched, we did not find schedule
return "", ErrScheduleNotFound
}

Expand Down Expand Up @@ -336,7 +401,7 @@ func (r *PersistentVolumeClaimReconciler) storageClassEventHandler() handler.Eve

var requests []reconcile.Request
for _, pvc := range pvcList.Items {
if annotationValueMissing(obj.GetAnnotations(), pvc.GetAnnotations(), annotationsToWatch) {
if annotationValueMissingOrDiff(obj.GetAnnotations(), pvc.GetAnnotations(), annotationsToWatch) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: pvc.Name,
Expand Down Expand Up @@ -576,6 +641,46 @@ func generateCronJobName(parentName string) string {
return fmt.Sprintf("%s-%d", parentName, time.Now().Unix())
}

// createPatchBytesForAnnotations creates JSON marshalled patch bytes for annotations.
func createPatchBytesForAnnotations(annotations map[string]string) ([]byte, error) {
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": annotations,
},
}

patchBytes, err := json.Marshal(patch)
if err != nil {
return nil, err
}

return patchBytes, nil
}

// addAnnotationsToResource adds annotations to the specified resource's metadata.
func (r *PersistentVolumeClaimReconciler) addAnnotationsToResource(
ctx context.Context,
logger *logr.Logger,
annotations map[string]string,
resource client.Object) error {
patch, err := createPatchBytesForAnnotations(annotations)
if err != nil {
logger.Error(err, "Failed to create patch bytes for annotations")

return err
}
logger.Info("Adding annotation", "Annotation", string(patch))

err = r.Client.Patch(ctx, resource, client.RawPatch(types.StrategicMergePatchType, patch))
if err != nil {
logger.Error(err, "Failed to update annotation")

return err
}

return nil
}

// processReclaimSpace reconciles ReclaimSpace based on annotations
func (r *PersistentVolumeClaimReconciler) processReclaimSpace(
ctx context.Context,
Expand Down Expand Up @@ -643,6 +748,14 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace(
}
logger.Info("Successfully updated reclaimSpaceCronJob")

// Update schedule on the pvc
err = r.addAnnotationsToResource(ctx, logger, map[string]string{
rsCronJobScheduleTimeAnnotation: schedule,
}, pvc)
if err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand All @@ -651,17 +764,11 @@ func (r *PersistentVolumeClaimReconciler) processReclaimSpace(
// add cronjob name and schedule in annotations.
// adding annotation is required for the case when pvc does not have
// have schedule annotation but namespace has.
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{%q:%q,%q:%q}}}`,
rsCronJobNameAnnotation,
rsCronJobName,
rsCronJobScheduleTimeAnnotation,
schedule,
))
logger.Info("Adding annotation", "Annotation", string(patch))
err = r.Client.Patch(ctx, pvc, client.RawPatch(types.StrategicMergePatchType, patch))
err = r.addAnnotationsToResource(ctx, logger, map[string]string{
rsCronJobNameAnnotation: rsCronJobName,
rsCronJobScheduleTimeAnnotation: schedule,
}, pvc)
if err != nil {
logger.Error(err, "Failed to update annotation")

return ctrl.Result{}, err
}

Expand Down Expand Up @@ -770,24 +877,25 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation(
logger.Error(err, "failed to update encryptionkeyrotationcronjob")
return err // ctr.Result
}

logger.Info("successfully updated encryptionkeyrotationcronjob")

// update the schedule on the pvc
err = r.addAnnotationsToResource(ctx, logger, map[string]string{
krcJobScheduleTimeAnnotation: sched,
}, pvc)
if err != nil {
return err
}
return nil
}

// Add the annotation to the pvc, this will help us optimize reconciles
krcJobName := generateCronJobName(req.Name)
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{%q:%q,%q:%q}}}`,
krcJobNameAnnotation,
krcJobName,
krcJobScheduleTimeAnnotation,
sched,
))
logger.Info("Adding keyrotation annotation to the pvc", "annotation", string(patch))
err = r.Client.Patch(ctx, pvc, client.RawPatch(types.StrategicMergePatchType, patch))
err = r.addAnnotationsToResource(ctx, logger, map[string]string{
krcJobNameAnnotation: krcJobName,
krcJobScheduleTimeAnnotation: sched,
}, pvc)
if err != nil {
logger.Error(err, "Failed to set annotation for keyrotation on the pvc")

return err
}

Expand All @@ -812,12 +920,12 @@ func (r *PersistentVolumeClaimReconciler) processKeyRotation(
return nil
}

// AnnotationValueMissing checks if any of the specified keys are missing
// from the PVC annotations when they are present in the StorageClass annotations.
func annotationValueMissing(scAnnotations, pvcAnnotations map[string]string, keys []string) bool {
// AnnotationValueMissingOrDiff checks if any of the specified keys are missing
// or differ from the PVC annotations when they are present in the StorageClass annotations.
func annotationValueMissingOrDiff(scAnnotations, pvcAnnotations map[string]string, keys []string) bool {
for _, key := range keys {
if _, scHasAnnotation := scAnnotations[key]; scHasAnnotation {
if _, pvcHasAnnotation := pvcAnnotations[key]; !pvcHasAnnotation {
if scValue, scHasAnnotation := scAnnotations[key]; scHasAnnotation {
if pvcValue, pvcHasAnnotation := pvcAnnotations[key]; !pvcHasAnnotation || scValue != pvcValue {
return true
}
}
Expand Down
Loading

0 comments on commit e27bea1

Please sign in to comment.