Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V2] refactor: move funtions to shared_state.go #1495

Merged
merged 1 commit into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions pkg/controller/attach_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
Expand Down Expand Up @@ -776,16 +775,3 @@ func NewAttachDetachController(mgr manager.Manager, cloudDiskAttacher CloudDiskA
c.GetLogger().Info("Controller set-up successful.")
return &reconciler, nil
}

// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller
func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) {
var vaName string
err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) {
val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name)
if exists {
vaName = val.(string)
}
return exists, nil
})
return vaName, err
}
147 changes: 0 additions & 147 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ import (
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
azdiskv1beta2 "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/azuredisk/v1beta2"
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
util "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
Expand Down Expand Up @@ -661,150 +658,6 @@ func NewAzVolumeController(mgr manager.Manager, volumeProvisioner VolumeProvisio
return &reconciler, nil
}

func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error {
var err error
ctx, w := workflow.New(ctx)
defer func() { w.Finish(err) }()

var desiredAzVolume *azdiskv1beta2.AzVolume
requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64()
volumeCapability := getVolumeCapabilityFromPv(&pv)

// create AzVolume CRI for CSI Volume Source
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == c.driverName {
desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI)
if err != nil {
return err
}
if azureutils.IsMultiNodePersistentVolume(pv) {
desiredAzVolume.Spec.MaxMountReplicaCount = 0
}

// create AzVolume CRI for AzureDisk Volume Source for migration case
} else if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
pv.Spec.AzureDisk != nil {
desiredAzVolume = c.createAzVolumeFromAzureDiskVolumeSource(pv.Spec.AzureDisk)
}

if desiredAzVolume != nil {
if desiredAzVolume.Labels == nil {
desiredAzVolume.Labels = map[string]string{}
}
desiredAzVolume.Labels[consts.PvNameLabel] = pv.Name
if pv.Spec.ClaimRef != nil {
desiredAzVolume.Labels[consts.PvcNameLabel] = pv.Spec.ClaimRef.Name
desiredAzVolume.Labels[consts.PvcNamespaceLabel] = pv.Spec.ClaimRef.Namespace
}
desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes}
desiredAzVolume.Spec.VolumeCapability = volumeCapability
desiredAzVolume.Spec.PersistentVolume = pv.Name
desiredAzVolume.Status.Annotations = annotations

w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name)

w.Logger().Info("Creating AzVolume CRI")
if err = c.createAzVolume(ctx, desiredAzVolume); err != nil {
err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err)
return err
}
}
return nil
}

func (c *SharedState) createAzVolumeFromInline(ctx context.Context, inline *v1.AzureDiskVolumeSource) (err error) {
azVolume := c.createAzVolumeFromAzureDiskVolumeSource(inline)

if err = c.createAzVolume(ctx, azVolume); err != nil {
err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for inline (%s): %v", azVolume.Name, inline.DiskName, err)
}
return
}

func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) {
diskName, err := azureutils.GetDiskName(source.VolumeHandle)
if err != nil {
return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err)
}

_, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false)

var volumeParams map[string]string
if source.VolumeAttributes == nil {
volumeParams = make(map[string]string)
} else {
volumeParams = source.VolumeAttributes
}

azVolumeName := strings.ToLower(diskName)

azVolume := azdiskv1beta2.AzVolume{
ObjectMeta: metav1.ObjectMeta{
Name: azVolumeName,
Finalizers: []string{consts.AzVolumeFinalizer},
},
Spec: azdiskv1beta2.AzVolumeSpec{
MaxMountReplicaCount: maxMountReplicaCount,
Parameters: volumeParams,
VolumeName: diskName,
},
Status: azdiskv1beta2.AzVolumeStatus{
Detail: &azdiskv1beta2.AzVolumeStatusDetail{
VolumeID: source.VolumeHandle,
},
State: azdiskv1beta2.VolumeCreated,
},
}

return &azVolume, nil
}

func (c *SharedState) createAzVolumeFromAzureDiskVolumeSource(source *v1.AzureDiskVolumeSource) *azdiskv1beta2.AzVolume {
azVolume := azdiskv1beta2.AzVolume{
ObjectMeta: metav1.ObjectMeta{
Name: source.DiskName,
Finalizers: []string{consts.AzVolumeFinalizer},
},
Spec: azdiskv1beta2.AzVolumeSpec{
VolumeName: source.DiskName,
VolumeCapability: []azdiskv1beta2.VolumeCapability{},
},
Status: azdiskv1beta2.AzVolumeStatus{
Detail: &azdiskv1beta2.AzVolumeStatusDetail{
VolumeID: source.DataDiskURI,
},
State: azdiskv1beta2.VolumeCreated,
Annotations: map[string]string{consts.InlineVolumeAnnotation: source.DataDiskURI},
},
}

return &azVolume
}

func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error {
var err error
var azVolume *azdiskv1beta2.AzVolume

if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{}); err != nil {
if apiErrors.IsNotFound(err) {
if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{}); err != nil {
return err
}
} else {
return err
}
}

updated := azVolume.DeepCopy()
updated.Status = desiredAzVolume.Status
if _, err := c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{}); err != nil {
return err
}
// if AzVolume CRI successfully recreated, also recreate the operation queue for the volume
c.createOperationQueue(desiredAzVolume.Name)
return nil
}

func getVolumeCapabilityFromPv(pv *v1.PersistentVolume) []azdiskv1beta2.VolumeCapability {
volCaps := []azdiskv1beta2.VolumeCapability{}

Expand Down
160 changes: 160 additions & 0 deletions pkg/controller/shared_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"container/list"
"context"
"fmt"
"sort"
"strings"
"sync"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
cache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -1120,6 +1122,7 @@ func (c *SharedState) manageReplicas(ctx context.Context, volumeName string) err
}
return nil
}

func (c *SharedState) createReplicas(ctx context.Context, remainingReplicas int, volumeName, volumeID string, volumeContext map[string]string) error {
var err error
ctx, w := workflow.New(ctx)
Expand Down Expand Up @@ -1226,3 +1229,160 @@ func (c *SharedState) getNodesForReplica(ctx context.Context, volumeName string,

return filtered, nil
}

func (c *SharedState) createAzVolumeFromPv(ctx context.Context, pv v1.PersistentVolume, annotations map[string]string) error {
var err error
ctx, w := workflow.New(ctx)
defer func() { w.Finish(err) }()

var desiredAzVolume *azdiskv1beta2.AzVolume
requiredBytes, _ := pv.Spec.Capacity.Storage().AsInt64()
volumeCapability := getVolumeCapabilityFromPv(&pv)

// create AzVolume CRI for CSI Volume Source
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == c.driverName {
desiredAzVolume, err = c.createAzVolumeFromCSISource(pv.Spec.CSI)
if err != nil {
return err
}
if azureutils.IsMultiNodePersistentVolume(pv) {
desiredAzVolume.Spec.MaxMountReplicaCount = 0
}

// create AzVolume CRI for AzureDisk Volume Source for migration case
} else if utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationAzureDisk) &&
pv.Spec.AzureDisk != nil {
desiredAzVolume = c.createAzVolumeFromAzureDiskVolumeSource(pv.Spec.AzureDisk)
}

if desiredAzVolume != nil {
if desiredAzVolume.Labels == nil {
desiredAzVolume.Labels = map[string]string{}
}
desiredAzVolume.Labels[consts.PvNameLabel] = pv.Name
if pv.Spec.ClaimRef != nil {
desiredAzVolume.Labels[consts.PvcNameLabel] = pv.Spec.ClaimRef.Name
desiredAzVolume.Labels[consts.PvcNamespaceLabel] = pv.Spec.ClaimRef.Namespace
}
desiredAzVolume.Spec.CapacityRange = &azdiskv1beta2.CapacityRange{RequiredBytes: requiredBytes}
desiredAzVolume.Spec.VolumeCapability = volumeCapability
desiredAzVolume.Spec.PersistentVolume = pv.Name
desiredAzVolume.Status.Annotations = annotations

w.AddDetailToLogger(consts.PvNameKey, pv.Name, consts.VolumeNameLabel, desiredAzVolume.Name)

w.Logger().Info("Creating AzVolume CRI")
if err = c.createAzVolume(ctx, desiredAzVolume); err != nil {
err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for PV (%s): %v", desiredAzVolume.Name, pv.Name, err)
return err
}
}
return nil
}

func (c *SharedState) createAzVolumeFromInline(ctx context.Context, inline *v1.AzureDiskVolumeSource) (err error) {
azVolume := c.createAzVolumeFromAzureDiskVolumeSource(inline)

if err = c.createAzVolume(ctx, azVolume); err != nil {
err = status.Errorf(codes.Internal, "failed to create AzVolume (%s) for inline (%s): %v", azVolume.Name, inline.DiskName, err)
}
return
}

func (c *SharedState) createAzVolumeFromCSISource(source *v1.CSIPersistentVolumeSource) (*azdiskv1beta2.AzVolume, error) {
diskName, err := azureutils.GetDiskName(source.VolumeHandle)
if err != nil {
return nil, fmt.Errorf("failed to extract diskName from volume handle (%s): %v", source.VolumeHandle, err)
}

_, maxMountReplicaCount := azureutils.GetMaxSharesAndMaxMountReplicaCount(source.VolumeAttributes, false)

var volumeParams map[string]string
if source.VolumeAttributes == nil {
volumeParams = make(map[string]string)
} else {
volumeParams = source.VolumeAttributes
}

azVolumeName := strings.ToLower(diskName)

azVolume := azdiskv1beta2.AzVolume{
ObjectMeta: metav1.ObjectMeta{
Name: azVolumeName,
Finalizers: []string{consts.AzVolumeFinalizer},
},
Spec: azdiskv1beta2.AzVolumeSpec{
MaxMountReplicaCount: maxMountReplicaCount,
Parameters: volumeParams,
VolumeName: diskName,
},
Status: azdiskv1beta2.AzVolumeStatus{
Detail: &azdiskv1beta2.AzVolumeStatusDetail{
VolumeID: source.VolumeHandle,
},
State: azdiskv1beta2.VolumeCreated,
},
}

return &azVolume, nil
}

func (c *SharedState) createAzVolumeFromAzureDiskVolumeSource(source *v1.AzureDiskVolumeSource) *azdiskv1beta2.AzVolume {
azVolume := azdiskv1beta2.AzVolume{
ObjectMeta: metav1.ObjectMeta{
Name: source.DiskName,
Finalizers: []string{consts.AzVolumeFinalizer},
},
Spec: azdiskv1beta2.AzVolumeSpec{
VolumeName: source.DiskName,
VolumeCapability: []azdiskv1beta2.VolumeCapability{},
},
Status: azdiskv1beta2.AzVolumeStatus{
Detail: &azdiskv1beta2.AzVolumeStatusDetail{
VolumeID: source.DataDiskURI,
},
State: azdiskv1beta2.VolumeCreated,
Annotations: map[string]string{consts.InlineVolumeAnnotation: source.DataDiskURI},
},
}

return &azVolume
}

func (c *SharedState) createAzVolume(ctx context.Context, desiredAzVolume *azdiskv1beta2.AzVolume) error {
var err error
var azVolume *azdiskv1beta2.AzVolume

if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Get(ctx, desiredAzVolume.Name, metav1.GetOptions{}); err != nil {
if apiErrors.IsNotFound(err) {
if azVolume, err = c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).Create(ctx, desiredAzVolume, metav1.CreateOptions{}); err != nil {
return err
}
} else {
return err
}
}

updated := azVolume.DeepCopy()
updated.Status = desiredAzVolume.Status
if _, err := c.azClient.DiskV1beta2().AzVolumes(c.objectNamespace).UpdateStatus(ctx, updated, metav1.UpdateOptions{}); err != nil {
return err
}
// if AzVolume CRI successfully recreated, also recreate the operation queue for the volume
c.createOperationQueue(desiredAzVolume.Name)
return nil
}

// waitForVolumeAttachmentNAme waits for the VolumeAttachment name to be updated in the azVolumeAttachmentVaMap by the volumeattachment controller
func (c *SharedState) waitForVolumeAttachmentName(ctx context.Context, azVolumeAttachment *azdiskv1beta2.AzVolumeAttachment) (string, error) {
var vaName string
err := wait.PollImmediateUntilWithContext(ctx, consts.DefaultPollingRate, func(ctx context.Context) (bool, error) {
val, exists := c.azVolumeAttachmentToVaMap.Load(azVolumeAttachment.Name)
if exists {
vaName = val.(string)
}
return exists, nil
})
return vaName, err
}