Skip to content

Commit

Permalink
Cross Namespace Volume Cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
emmahardison authored Jan 10, 2025
1 parent a9f98f3 commit 42d5277
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
2 changes: 2 additions & 0 deletions frontend/csi/controller_helpers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const (
AnnInternalSnapshotName = annPrefix + "/internalSnapshotName"
AnnMirrorRelationship = annPrefix + "/mirrorRelationship"
AnnVolumeShareFromPVC = annPrefix + "/shareFromPVC"
AnnVolumeCloneToNS = annPrefix + "/cloneToNamespace"
AnnVolumeCloneFromNS = annPrefix + "/cloneFromNamespace"
AnnVolumeShareToNS = annPrefix + "/shareToNamespace"
AnnReadOnlyClone = annPrefix + "/readOnlyClone"
AnnLUKSEncryption = annPrefix + "/luksEncryption" // import only
Expand Down
61 changes: 51 additions & 10 deletions frontend/csi/controller_helpers/kubernetes/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,37 @@ func (h *helper) getSnapshotCloneSourceInfo(
if sourceSnapshotName == "" {
return "", "", fmt.Errorf("annotation 'cloneFromSnapshot' is empty")
}
namespace := clonePVC.Namespace
cloneFromNamespace := getAnnotation(annotations, AnnVolumeCloneFromNS)
if cloneFromNamespace != "" {
// Source snapshot is in a different namespace
namespace = cloneFromNamespace
}

// Get the VolumeSnapshot
snapshot, err := h.getVolumeSnapshot(ctx, sourceSnapshotName, clonePVC.Namespace)
snapshot, err := h.getVolumeSnapshot(ctx, sourceSnapshotName, namespace)
if err != nil {
return "", "", err
}

// If cloning from another namespace, ensure the source PVC has the required annotation
if cloneFromNamespace != "" {
snapSourcePVC, err := h.waitForCachedPVCByName(ctx, *snapshot.Spec.Source.PersistentVolumeClaimName, namespace, PreSyncCacheWaitPeriod)
if err != nil {
Logc(ctx).WithFields(LogFields{
"sourcePVCName": snapSourcePVC.Name,
"namespace": namespace,
}).Errorf("Clone source PVC not found in local cache: %v", err)
return "", "", err
}
sourceAnnotations := processPVCAnnotations(snapSourcePVC, "")
sourceCloneToNamespaces := getAnnotation(sourceAnnotations, AnnVolumeCloneToNS)
// Ensure the source PVC has been explicitly allowed to clone to the subordinate PVC namespace
if !h.matchNamespaceToAnnotation(clonePVC.Namespace, sourceCloneToNamespaces) {
return "", "", fmt.Errorf("cloning to namespace %s is not allowed, it is not listed in cloneToNamespace annotation", clonePVC.Namespace)
}

}
// If the clone from PVC annotation is also set, ensure it matches the snapshot
sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC)
if sourcePVCName != "" {
Expand Down Expand Up @@ -371,16 +395,33 @@ func (h *helper) getCloneSourceInfo(ctx context.Context, clonePVC *v1.Persistent
if sourcePVCName == "" {
return "", fmt.Errorf("annotation 'cloneFromPVC' is empty")
}
namespace := clonePVC.Namespace
cloneFromNamespace := getAnnotation(annotations, AnnVolumeCloneFromNS)

// Check that the source PVC is in the same namespace.
// NOTE: For VolumeContentSource this check is performed by CSI
sourcePVC, err := h.waitForCachedPVCByName(ctx, sourcePVCName, clonePVC.Namespace, PreSyncCacheWaitPeriod)
// Determine what namespace to look for the source PVC in
if cloneFromNamespace != "" {
// Source PVC is in a different namespace
namespace = cloneFromNamespace
}

// Retrieve the source PVC from the cache
sourcePVC, err := h.waitForCachedPVCByName(ctx, sourcePVCName, namespace, PreSyncCacheWaitPeriod)
if err != nil {
Logc(ctx).WithFields(LogFields{
"sourcePVCName": sourcePVCName,
"namespace": clonePVC.Namespace,
"namespace": namespace,
}).Errorf("Clone source PVC not found in local cache: %v", err)
return "", fmt.Errorf("cloning from a PVC requires both PVCs be in the same namespace: %v", err)
return "", fmt.Errorf("source PVC not found in namespace: %v", err)
}

// If cloning from another namespace, ensure the source PVC has the required annotation
if cloneFromNamespace != "" {
sourceAnnotations := processPVCAnnotations(sourcePVC, "")
sourceCloneToNamespaces := getAnnotation(sourceAnnotations, AnnVolumeCloneToNS)
// Ensure the source PVC has been explicitly allowed to clone to the subordinate PVC namespace
if !h.matchNamespaceToAnnotation(clonePVC.Namespace, sourceCloneToNamespaces) {
return "", fmt.Errorf("cloning to namespace %s is not allowed, it is not listed in cloneToNamespace annotation", clonePVC.Namespace)
}
}

// Check that both source and clone PVCs have the same storage class
Expand Down Expand Up @@ -481,10 +522,10 @@ func (h *helper) validateSubordinateVolumeConfig(
return nil
}

func (h *helper) matchNamespaceToAnnotation(namespace, shareToAnnotation string) bool {
shareToNamespaces := strings.Split(shareToAnnotation, ",")
for _, shareToNamespace := range shareToNamespaces {
if shareToNamespace == namespace || shareToNamespace == "*" {
func (h *helper) matchNamespaceToAnnotation(namespace, annotation string) bool {
availableToNamespaces := strings.Split(annotation, ",")
for _, availableToNamespace := range availableToNamespaces {
if availableToNamespace == namespace || availableToNamespace == "*" {
return true
}
}
Expand Down

0 comments on commit 42d5277

Please sign in to comment.