Skip to content

Commit

Permalink
smb restore from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Apr 16, 2024
1 parent 1b5b1de commit 074ed1a
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ spec:
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /root/.azcopy
name: azcopy-dir
- mountPath: /etc/kubernetes/
name: azure-cred
{{- if eq .Values.linux.distro "fedora" }}
Expand All @@ -209,6 +211,8 @@ spec:
volumes:
- name: socket-dir
emptyDir: {}
- name: azcopy-dir
emptyDir: {}
- name: azure-cred
hostPath:
path: /etc/kubernetes/
Expand Down
4 changes: 4 additions & 0 deletions deploy/csi-azurefile-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ spec:
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /root/.azcopy
name: azcopy-dir
- mountPath: /etc/kubernetes/
name: azure-cred
resources:
Expand All @@ -155,6 +157,8 @@ spec:
volumes:
- name: socket-dir
emptyDir: {}
- name: azcopy-dir
emptyDir: {}
- name: azure-cred
hostPath:
path: /etc/kubernetes/
Expand Down
16 changes: 16 additions & 0 deletions deploy/example/snapshot/pvc-azurefile-snapshot-restored.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-azurefile-snapshot-restored
spec:
accessModes:
- ReadWriteMany
storageClassName: azurefile-csi
resources:
requests:
storage: 100Gi
dataSource:
name: azurefile-volume-snapshot
kind: VolumeSnapshot
apiGroup: snapshot.storage.k8s.io
61 changes: 5 additions & 56 deletions pkg/azurefile/azurefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"errors"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -197,7 +195,10 @@ var (

retriableErrors = []string{accountNotProvisioned, tooManyRequests, shareBeingDeleted, clientThrottled}

defaultAzcopyCopyOptions = []string{"--recursive", "--check-length=false"}
// azcopyCloneVolumeOptions used in volume cloning and set --check-length to false because volume data may be in changing state, copy volume is not same as current source volume
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false"}
// azcopySnapshotRestoreOptions used in smb snapshot restore and set --check-length to true because snapshot data is changeless
azcopySnapshotRestoreOptions = []string{"--recursive", "--check-length=true"}
)

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -262,9 +263,6 @@ type Driver struct {

kubeconfig string
endpoint string

// azcopy use sas token by default
azcopyUseSasToken bool
}

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
Expand Down Expand Up @@ -302,7 +300,6 @@ func NewDriver(options *DriverOptions) *Driver {
driver.azcopy = &fileutil.Azcopy{}
driver.kubeconfig = options.KubeConfig
driver.endpoint = options.Endpoint
driver.azcopyUseSasToken = options.AzcopyUseSasToken

var err error
getter := func(key string) (interface{}, error) { return nil, nil }
Expand Down Expand Up @@ -1012,55 +1009,7 @@ func (d *Driver) copyFileShare(ctx context.Context, req *csi.CreateVolumeRequest
srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, srcFileShareName, accountSASToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", accountName, storageEndpointSuffix, dstFileShareName, accountSASToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case fileutil.AzcopyJobError, fileutil.AzcopyJobCompleted:
return err
case fileutil.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case fileutil.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
execFuncWithAuth := func() error {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
}
copyErr := fileutil.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data SMB Share Elevated Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
var sasToken string
if sasToken, _, err = d.getAzcopyAuth(ctx, accountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
execFuncWithSasToken := func() error {
cmd := exec.Command("azcopy", "copy", srcPath+sasToken, dstPath+sasToken)
cmd.Args = append(cmd.Args, defaultAzcopyCopyOptions...)
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
copyErr = fileutil.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithSasToken, timeoutFunc)
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
return err
return d.copyFileShareByAzcopy(ctx, srcFileShareName, dstFileShareName, srcPath, dstPath, "", accountName, accountName, resourceGroupName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, accountOptions, storageEndpointSuffix)
}

// GetTotalAccountQuota returns the total quota in GB of all file shares in the storage account and the number of file shares
Expand Down
4 changes: 1 addition & 3 deletions pkg/azurefile/azurefile_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type DriverOptions struct {
WaitForAzCopyTimeoutMinutes int
KubeConfig string
Endpoint string
AzcopyUseSasToken bool
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -81,10 +80,9 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.IntVar(&o.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
fs.BoolVar(&o.PrintVolumeStatsCallLogs, "print-volume-stats-call-logs", false, "Whether to print volume statfs call logs with log level 2")
fs.IntVar(&o.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning and snapshot restore")
fs.IntVar(&o.WaitForAzCopyTimeoutMinutes, "wait-for-azcopy-timeout-minutes", 18, "timeout in minutes for waiting for azcopy to finish")
fs.IntVar(&o.WaitForAzCopyTimeoutMinutes, "wait-for-azcopy-timeout-minutes", 5, "timeout in minutes for waiting for azcopy to finish")
fs.StringVar(&o.KubeConfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
fs.BoolVar(&o.AzcopyUseSasToken, "azcopy-use-sas-token", true, "Whether SAS token should be used in azcopy based on volume clone and snapshot restore")

return fs
}
151 changes: 138 additions & 13 deletions pkg/azurefile/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
"net/url"
"os"
"os/exec"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -53,13 +55,14 @@ const (
snapshotTimeFormat = "2006-01-02T15:04:05.0000000Z07:00"
snapshotsExpand = "snapshots"

azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"
azcopyAutoLoginType = "AZCOPY_AUTO_LOGIN_TYPE"
azcopySPAApplicationID = "AZCOPY_SPA_APPLICATION_ID"
azcopySPAClientSecret = "AZCOPY_SPA_CLIENT_SECRET"
azcopyTenantID = "AZCOPY_TENANT_ID"
azcopyMSIClientID = "AZCOPY_MSI_CLIENT_ID"
MSI = "MSI"
SPN = "SPN"

authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
)

Expand Down Expand Up @@ -577,11 +580,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.Internal, "failed to create file share(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d), error: %v", validFileShareName, account, sku, subsID, resourceGroup, location, fileShareSize, err)
}
if req.GetVolumeContentSource() != nil {
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secret, secretName, secretNamespace, d.azcopyUseSasToken)
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secret, secretName, secretNamespace, false)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
if err := d.copyVolume(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secret, shareOptions, accountOptions, storageEndpointSuffix); err != nil {
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secret, shareOptions, accountOptions, storageEndpointSuffix); err != nil {
return nil, err
}
// storeAccountKey is not needed here since copy volume is only using SAS token
Expand Down Expand Up @@ -732,11 +735,11 @@ func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
}

// copyVolume copy an azure file
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
return d.restoreSnapshot(ctx, req, accountName, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, shareOptions, accountOptions, storageEndpointSuffix)
case *csi.VolumeContentSource_Volume:
return d.copyFileShare(ctx, req, accountSASToken, authAzcopyEnv, secretName, secretNamespace, secrets, shareOptions, accountOptions, storageEndpointSuffix)
default:
Expand Down Expand Up @@ -986,6 +989,129 @@ func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (
return nil, status.Error(codes.Unimplemented, "")
}

// restoreSnapshot restores from a snapshot
func (d *Driver) restoreSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName, dstAccountSasToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, shareOptions *fileclient.ShareOptions, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
if shareOptions.Protocol == storage.EnabledProtocolsNFS {
return fmt.Errorf("protocol nfs is not supported for snapshot restore")
}
var sourceSnapshotID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetSnapshot() != nil {
sourceSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId()
}
resourceGroupName, srcAccountName, srcFileShareName, _, _, _, err := GetFileShareInfo(sourceSnapshotID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
snapshot, err := getSnapshot(sourceSnapshotID)
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
dstFileShareName := shareOptions.Name
if srcFileShareName == "" || dstFileShareName == "" {
return fmt.Errorf("srcFileShareName(%s) or dstFileShareName(%s) is empty", srcFileShareName, dstFileShareName)
}
var srcAccountSasToken string
srcAccountSasToken = dstAccountSasToken
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
ResourceGroup: accountOptions.ResourceGroup,
SubscriptionID: accountOptions.SubscriptionID,
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
}
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
return err
}
}

srcPath := fmt.Sprintf("https://%s.file.%s/%s%s", srcAccountName, storageEndpointSuffix, srcFileShareName, srcAccountSasToken)
dstPath := fmt.Sprintf("https://%s.file.%s/%s%s", dstAccountName, storageEndpointSuffix, dstFileShareName, dstAccountSasToken)

srcFileShareSnapshotName := fmt.Sprintf("%s(snapshot: %s)", srcFileShareName, snapshot)
return d.copyFileShareByAzcopy(ctx, srcFileShareSnapshotName, dstFileShareName, srcPath, dstPath, snapshot, srcAccountName, dstAccountName, resourceGroupName, srcAccountSasToken, authAzcopyEnv, secretName, secretNamespace, secrets, accountOptions, storageEndpointSuffix)
}

func (d *Driver) copyFileShareByAzcopy(ctx context.Context, srcFileShareName, dstFileShareName, srcPath, dstPath, snapshot, srcAccountName, dstAccountName, resourceGroupName, accountSASToken string, authAzcopyEnv []string, secretName, secretNamespace string, secrets map[string]string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
azcopyCopyOptions := azcopyCloneVolumeOptions
srcPathAuth := srcPath
srcPathSASSnapshot := ""
if snapshot != "" {
azcopyCopyOptions = azcopySnapshotRestoreOptions
if accountSASToken == "" {
srcPathAuth = fmt.Sprintf("%s?sharesnapshot=%s", srcPath, snapshot)
} else {
srcPathAuth = fmt.Sprintf("%s&sharesnapshot=%s", srcPath, snapshot)
}
srcPathSASSnapshot = fmt.Sprintf("&sharesnapshot=%s", snapshot)
}

jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
return err
case volumehelper.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case volumehelper.AzcopyJobNotFound:
klog.V(2).Infof("copy fileshare %s to %s", srcFileShareName, dstFileShareName)
execFuncWithAuth := func() error {
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
timeoutFunc := func() error {
_, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
return fmt.Errorf("timeout waiting for copy fileshare %s to %s complete, current copy percent: %s%%", srcFileShareName, dstFileShareName, percent)
}
copyErr := volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithAuth, timeoutFunc)
if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage File Data Privileged Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
var srcSasToken, dstSasToken string
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
ResourceGroup: accountOptions.ResourceGroup,
SubscriptionID: accountOptions.SubscriptionID,
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
}
if srcSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
return err
}
if srcAccountName == dstAccountName {
dstSasToken = srcSasToken
} else {
if dstSasToken, _, err = d.getAzcopyAuth(ctx, dstAccountName, "", storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true); err != nil {
return err
}
}
execFuncWithSasToken := func() error {
if out, err := d.execAzcopyCopy(srcPath+srcSasToken+srcPathSASSnapshot, dstPath+dstSasToken, azcopyCopyOptions, []string{}); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
}
copyErr = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFuncWithSasToken, timeoutFunc)
}
if copyErr != nil {
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", resourceGroupName, srcAccountName, dstFileShareName, copyErr)
} else {
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
}
return copyErr
}
return err
}

// execAzcopyCopy exec azcopy copy command
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
return cmd.CombinedOutput()
}

// ControllerExpandVolume controller expand volume
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
Expand Down Expand Up @@ -1227,11 +1353,10 @@ func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
// 1. secrets is not empty
// 2. driver is not using managed identity and service principal
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
// 4. parameter useSasToken is true
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
var authAzcopyEnv []string
var err error
if !useSasToken && len(secrets) == 0 && len(secretName) == 0 {
if len(secrets) == 0 && len(secretName) == 0 {
// search in cache first
if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
Expand Down
Loading

0 comments on commit 074ed1a

Please sign in to comment.