From e7b21eb9fe0d58ccfab56dc08db6dda4b5f065ba Mon Sep 17 00:00:00 2001 From: hccheng72 Date: Wed, 31 Aug 2022 10:46:10 -0700 Subject: [PATCH] refactor: move funtions to shared_state.go --- pkg/controller/attach_detach.go | 14 --- pkg/controller/azvolume.go | 147 ----------------------------- pkg/controller/shared_state.go | 160 ++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 161 deletions(-) diff --git a/pkg/controller/attach_detach.go b/pkg/controller/attach_detach.go index 3a8cb81e52..048401d7d0 100644 --- a/pkg/controller/attach_detach.go +++ b/pkg/controller/attach_detach.go @@ -29,7 +29,6 @@ import ( storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2" consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" @@ -776,16 +775,3 @@ func NewAttachDetachController(mgr manager.Manager, cloudDiskAttacher CloudDiskA c.GetLogger().Info("Controller set-up successful.") return &reconciler, nil } - -// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller -func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) { - var vaName string - err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) { - val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name) - if exists { - vaName = val.(string) - } - return exists, nil - }) - return vaName, err -} diff --git a/pkg/controller/azvolume.go b/pkg/controller/azvolume.go index 8b2dc18248..0247038e03 100644 --- a/pkg/controller/azvolume.go +++ b/pkg/controller/azvolume.go @@ -28,10 +28,7 @@ import ( "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - apiErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/kubernetes/pkg/features" azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2" "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils" util "sigs.k8s.io/azuredisk-csi-driver/pkg/util" @@ -661,150 +658,6 @@ func NewAzVolumeController(mgr manager.Manager, volumeProvisioner VolumeProvisio return &reconciler, nil } -func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error { - var err error - ctx, w := workflow.New(ctx) - defer func() { w.Finish(err) }() - - var desiredAzVolume *azdiskv1beta2.AzVolume - requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64() - volumeCapability := getVolumeCapabilityFromPv(&pv) - - // create AzVolume CRI for CSI Volume Source - if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == c.driverName { - desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI) - if err != nil { - return err - } - if azureutils.IsMultiNodePersistentVolume(pv) { - desiredAzVolume.Spec.MaxMountReplicaCount = 0 - } - - // create AzVolume CRI for AzureDisk Volume Source for migration case - } else if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && - utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) && - pv.Spec.AzureDisk != nil { - desiredAzVolume = c.createAzVolumeFromAzureDiskVolumeSource(pv.Spec.AzureDisk) - } - - if desiredAzVolume != nil { - if desiredAzVolume.Labels == nil { - desiredAzVolume.Labels = map[string]string{} - } - desiredAzVolume.Labels[consts.PvNameLabel] = pv.Name - if pv.Spec.ClaimRef != nil { - desiredAzVolume.Labels[consts.PvcNameLabel] = pv.Spec.ClaimRef.Name - desiredAzVolume.Labels[consts.PvcNamespaceLabel] = pv.Spec.ClaimRef.Namespace - } - desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes} - desiredAzVolume.Spec.VolumeCapability = volumeCapability - desiredAzVolume.Spec.PersistentVolume = pv.Name - desiredAzVolume.Status.Annotations = annotations - - w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name) - - w.Logger().Info("Creating AzVolume CRI") - if err = c.createAzVolume(ctx, desiredAzVolume); err != nil { - err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err) - return err - } - } - return nil -} - -func (c *SharedState) createAzVolumeFromInline(ctx context.Context, inline *v1.AzureDiskVolumeSource) (err error) { - azVolume := c.createAzVolumeFromAzureDiskVolumeSource(inline) - - if err = c.createAzVolume(ctx, azVolume); err != nil { - err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for inline (%s): %v", azVolume.Name, inline.DiskName, err) - } - return -} - -func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) { - diskName, err := azureutils.GetDiskName(source.VolumeHandle) - if err != nil { - return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err) - } - - _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false) - - var volumeParams map[string]string - if source.VolumeAttributes == nil { - volumeParams = make(map[string]string) - } else { - volumeParams = source.VolumeAttributes - } - - azVolumeName := strings.ToLower(diskName) - - azVolume := azdiskv1beta2.AzVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: azVolumeName, - Finalizers: []string{consts.AzVolumeFinalizer}, - }, - Spec: azdiskv1beta2.AzVolumeSpec{ - MaxMountReplicaCount: maxMountReplicaCount, - Parameters: volumeParams, - VolumeName: diskName, - }, - Status: azdiskv1beta2.AzVolumeStatus{ - Detail: &azdiskv1beta2.AzVolumeStatusDetail{ - VolumeID: source.VolumeHandle, - }, - State: azdiskv1beta2.VolumeCreated, - }, - } - - return &azVolume, nil -} - -func (c *SharedState) createAzVolumeFromAzureDiskVolumeSource(source *v1.AzureDiskVolumeSource) *azdiskv1beta2.AzVolume { - azVolume := azdiskv1beta2.AzVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: source.DiskName, - Finalizers: []string{consts.AzVolumeFinalizer}, - }, - Spec: azdiskv1beta2.AzVolumeSpec{ - VolumeName: source.DiskName, - VolumeCapability: []azdiskv1beta2.VolumeCapability{}, - }, - Status: azdiskv1beta2.AzVolumeStatus{ - Detail: &azdiskv1beta2.AzVolumeStatusDetail{ - VolumeID: source.DataDiskURI, - }, - State: azdiskv1beta2.VolumeCreated, - Annotations: map[string]string{consts.InlineVolumeAnnotation: source.DataDiskURI}, - }, - } - - return &azVolume -} - -func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error { - var err error - var azVolume *azdiskv1beta2.AzVolume - - if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{}); err != nil { - if apiErrors.IsNotFound(err) { - if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{}); err != nil { - return err - } - } else { - return err - } - } - - updated := azVolume.DeepCopy() - updated.Status = desiredAzVolume.Status - if _, err := c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{}); err != nil { - return err - } - // if AzVolume CRI successfully recreated, also recreate the operation queue for the volume - c.createOperationQueue(desiredAzVolume.Name) - return nil -} - func getVolumeCapabilityFromPv(pv *v1.PersistentVolume) []azdiskv1beta2.VolumeCapability { volCaps := []azdiskv1beta2.VolumeCapability{} diff --git a/pkg/controller/shared_state.go b/pkg/controller/shared_state.go index 2177757af0..07bcf05d5f 100644 --- a/pkg/controller/shared_state.go +++ b/pkg/controller/shared_state.go @@ -19,6 +19,7 @@ package controller import ( "container/list" "context" + "fmt" "sort" "strings" "sync" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" cache "k8s.io/client-go/tools/cache" @@ -1120,6 +1122,7 @@ func (c *SharedState) manageReplicas(ctx context.Context, volumeName string) err } return nil } + func (c *SharedState) createReplicas(ctx context.Context, remainingReplicas int, volumeName, volumeID string, volumeContext map[string]string) error { var err error ctx, w := workflow.New(ctx) @@ -1226,3 +1229,160 @@ func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string, return filtered, nil } + +func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error { + var err error + ctx, w := workflow.New(ctx) + defer func() { w.Finish(err) }() + + var desiredAzVolume *azdiskv1beta2.AzVolume + requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64() + volumeCapability := getVolumeCapabilityFromPv(&pv) + + // create AzVolume CRI for CSI Volume Source + if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == c.driverName { + desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI) + if err != nil { + return err + } + if azureutils.IsMultiNodePersistentVolume(pv) { + desiredAzVolume.Spec.MaxMountReplicaCount = 0 + } + + // create AzVolume CRI for AzureDisk Volume Source for migration case + } else if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && + utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) && + pv.Spec.AzureDisk != nil { + desiredAzVolume = c.createAzVolumeFromAzureDiskVolumeSource(pv.Spec.AzureDisk) + } + + if desiredAzVolume != nil { + if desiredAzVolume.Labels == nil { + desiredAzVolume.Labels = map[string]string{} + } + desiredAzVolume.Labels[consts.PvNameLabel] = pv.Name + if pv.Spec.ClaimRef != nil { + desiredAzVolume.Labels[consts.PvcNameLabel] = pv.Spec.ClaimRef.Name + desiredAzVolume.Labels[consts.PvcNamespaceLabel] = pv.Spec.ClaimRef.Namespace + } + desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes} + desiredAzVolume.Spec.VolumeCapability = volumeCapability + desiredAzVolume.Spec.PersistentVolume = pv.Name + desiredAzVolume.Status.Annotations = annotations + + w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name) + + w.Logger().Info("Creating AzVolume CRI") + if err = c.createAzVolume(ctx, desiredAzVolume); err != nil { + err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err) + return err + } + } + return nil +} + +func (c *SharedState) createAzVolumeFromInline(ctx context.Context, inline *v1.AzureDiskVolumeSource) (err error) { + azVolume := c.createAzVolumeFromAzureDiskVolumeSource(inline) + + if err = c.createAzVolume(ctx, azVolume); err != nil { + err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for inline (%s): %v", azVolume.Name, inline.DiskName, err) + } + return +} + +func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) { + diskName, err := azureutils.GetDiskName(source.VolumeHandle) + if err != nil { + return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err) + } + + _, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false) + + var volumeParams map[string]string + if source.VolumeAttributes == nil { + volumeParams = make(map[string]string) + } else { + volumeParams = source.VolumeAttributes + } + + azVolumeName := strings.ToLower(diskName) + + azVolume := azdiskv1beta2.AzVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: azVolumeName, + Finalizers: []string{consts.AzVolumeFinalizer}, + }, + Spec: azdiskv1beta2.AzVolumeSpec{ + MaxMountReplicaCount: maxMountReplicaCount, + Parameters: volumeParams, + VolumeName: diskName, + }, + Status: azdiskv1beta2.AzVolumeStatus{ + Detail: &azdiskv1beta2.AzVolumeStatusDetail{ + VolumeID: source.VolumeHandle, + }, + State: azdiskv1beta2.VolumeCreated, + }, + } + + return &azVolume, nil +} + +func (c *SharedState) createAzVolumeFromAzureDiskVolumeSource(source *v1.AzureDiskVolumeSource) *azdiskv1beta2.AzVolume { + azVolume := azdiskv1beta2.AzVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: source.DiskName, + Finalizers: []string{consts.AzVolumeFinalizer}, + }, + Spec: azdiskv1beta2.AzVolumeSpec{ + VolumeName: source.DiskName, + VolumeCapability: []azdiskv1beta2.VolumeCapability{}, + }, + Status: azdiskv1beta2.AzVolumeStatus{ + Detail: &azdiskv1beta2.AzVolumeStatusDetail{ + VolumeID: source.DataDiskURI, + }, + State: azdiskv1beta2.VolumeCreated, + Annotations: map[string]string{consts.InlineVolumeAnnotation: source.DataDiskURI}, + }, + } + + return &azVolume +} + +func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error { + var err error + var azVolume *azdiskv1beta2.AzVolume + + if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{}); err != nil { + if apiErrors.IsNotFound(err) { + if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{}); err != nil { + return err + } + } else { + return err + } + } + + updated := azVolume.DeepCopy() + updated.Status = desiredAzVolume.Status + if _, err := c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{}); err != nil { + return err + } + // if AzVolume CRI successfully recreated, also recreate the operation queue for the volume + c.createOperationQueue(desiredAzVolume.Name) + return nil +} + +// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller +func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) { + var vaName string + err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) { + val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name) + if exists { + vaName = val.(string) + } + return exists, nil + }) + return vaName, err +}