diff --git a/pkg/cloud/ecsinterface.go b/pkg/cloud/ecsinterface.go index 633ed29e7..985e503f3 100644 --- a/pkg/cloud/ecsinterface.go +++ b/pkg/cloud/ecsinterface.go @@ -3,6 +3,7 @@ package cloud import "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" type ECSInterface interface { + CreateDisk(request *ecs.CreateDiskRequest) (response *ecs.CreateDiskResponse, err error) DescribeAvailableResource(request *ecs.DescribeAvailableResourceRequest) (response *ecs.DescribeAvailableResourceResponse, err error) DeleteDisk(request *ecs.DeleteDiskRequest) (response *ecs.DeleteDiskResponse, err error) DescribeRegions(request *ecs.DescribeRegionsRequest) (response *ecs.DescribeRegionsResponse, err error) diff --git a/pkg/cloud/ecsmock.go b/pkg/cloud/ecsmock.go index 6d5ec8bfe..e84c10a47 100644 --- a/pkg/cloud/ecsmock.go +++ b/pkg/cloud/ecsmock.go @@ -34,6 +34,21 @@ func (m *MockECSInterface) EXPECT() *MockECSInterfaceMockRecorder { return m.recorder } +// CreateDisk mocks base method. +func (m *MockECSInterface) CreateDisk(request *ecs.CreateDiskRequest) (*ecs.CreateDiskResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDisk", request) + ret0, _ := ret[0].(*ecs.CreateDiskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDisk indicates an expected call of CreateDisk. +func (mr *MockECSInterfaceMockRecorder) CreateDisk(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDisk", reflect.TypeOf((*MockECSInterface)(nil).CreateDisk), request) +} + // DeleteDisk mocks base method. func (m *MockECSInterface) DeleteDisk(request *ecs.DeleteDiskRequest) (*ecs.DeleteDiskResponse, error) { m.ctrl.T.Helper() diff --git a/pkg/disk/category.go b/pkg/disk/category.go new file mode 100644 index 000000000..3314a373a --- /dev/null +++ b/pkg/disk/category.go @@ -0,0 +1,86 @@ +package disk + +type Category string +type PerformanceLevel string + +const ( + DiskCommon Category = "cloud" + DiskEfficiency Category = "cloud_efficiency" + DiskSSD Category = "cloud_ssd" + DiskESSD Category = "cloud_essd" + DiskESSDAuto Category = "cloud_auto" + DiskESSDEntry Category = "cloud_essd_entry" + + DiskPPerf Category = "cloud_pperf" + DiskSPerf Category = "cloud_sperf" + DiskSharedSSD Category = "san_ssd" + DiskSharedEfficiency Category = "san_efficiency" + + PERFORMANCE_LEVEL0 PerformanceLevel = "PL0" + PERFORMANCE_LEVEL1 PerformanceLevel = "PL1" + PERFORMANCE_LEVEL2 PerformanceLevel = "PL2" + PERFORMANCE_LEVEL3 PerformanceLevel = "PL3" +) + +type SizeRange struct { + Min int64 + Max int64 +} + +type PerformanceLevelDesc struct { + Size SizeRange +} + +type CategoryDesc struct { + Size SizeRange + PerformanceLevel map[PerformanceLevel]PerformanceLevelDesc + InstantAccessSnapshot bool + ProvisionedIops bool + Bursting bool +} + +var AllCategories = map[Category]CategoryDesc{ + DiskCommon: {}, + DiskEfficiency: { + Size: SizeRange{Min: 20, Max: 65536}, + }, + DiskSSD: { + Size: SizeRange{Min: 20, Max: 65536}, + }, + DiskESSD: { + Size: SizeRange{Min: 20, Max: 65536}, + PerformanceLevel: map[PerformanceLevel]PerformanceLevelDesc{ + PERFORMANCE_LEVEL0: {Size: SizeRange{Min: 1, Max: 65536}}, + PERFORMANCE_LEVEL1: {Size: SizeRange{Min: 20, Max: 65536}}, + PERFORMANCE_LEVEL2: {Size: SizeRange{Min: 461, Max: 65536}}, + PERFORMANCE_LEVEL3: {Size: SizeRange{Min: 1261, Max: 65536}}, + }, + InstantAccessSnapshot: true, + }, + DiskESSDAuto: { + Size: SizeRange{Min: 1, Max: 65536}, + InstantAccessSnapshot: true, + ProvisionedIops: true, + Bursting: true, + }, + DiskESSDEntry: { + Size: SizeRange{Min: 10, Max: 65536}, + }, + + // Deprecated shared disk + DiskSharedSSD: {}, + DiskSharedEfficiency: {}, + + // Only available in private cloud + DiskPPerf: {}, + DiskSPerf: {}, +} + +func GetSizeRange(category Category, pl PerformanceLevel) SizeRange { + desc := AllCategories[category] + limit := desc.Size + if plDesc, ok := desc.PerformanceLevel[pl]; ok { + limit = plDesc.Size + } + return limit +} diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 5ec752393..350c65160 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -18,8 +18,10 @@ package disk import ( "context" + "encoding/base64" "errors" "fmt" + "hash/fnv" "os" "regexp" "slices" @@ -40,6 +42,7 @@ import ( "google.golang.org/grpc/status" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ) @@ -680,6 +683,27 @@ func findDiskByID(diskID string, ecsClient *ecs.Client) (*ecs.Disk, error) { return &disks[0], err } +func findDiskByName(name string, ecsClient cloud.ECSInterface) (*ecs.Disk, error) { + describeDisksRequest := ecs.CreateDescribeDisksRequest() + describeDisksRequest.RegionId = GlobalConfigVar.Region + tags := []ecs.DescribeDisksTag{{Key: common.VolumeNameTag, Value: name}} + describeDisksRequest.Tag = &tags + + diskResponse, err := ecsClient.DescribeDisks(describeDisksRequest) + if err != nil { + return nil, err + } + disks := diskResponse.Disks.Disk + + if len(disks) == 0 { + return nil, nil + } + if len(disks) > 1 { + return nil, fmt.Errorf("find more than one disk with name %s", name) + } + return &disks[0], err +} + func findSnapshotByName(name string) (*ecs.DescribeSnapshotsResponse, int, error) { describeSnapShotRequest := ecs.CreateDescribeSnapshotsRequest() describeSnapShotRequest.RegionId = GlobalConfigVar.Region @@ -814,211 +838,285 @@ func requestAndDeleteSnapshot(snapshotID string) (*ecs.DeleteSnapshotResponse, e return response, nil } +func checkExistingDisk(existingDisk *ecs.Disk, diskVol *diskVolumeArgs) (createAttempt, error) { + if int64(existingDisk.Size) != diskVol.RequestGB { + return createAttempt{}, fmt.Errorf("%dGiB != requested %dGiB", existingDisk.Size, diskVol.RequestGB) + } + if existingDisk.Encrypted != diskVol.Encrypted { + return createAttempt{}, fmt.Errorf("encrypted: %t != requested %t", existingDisk.Encrypted, diskVol.Encrypted) + } + if diskVol.Encrypted { + if existingDisk.KMSKeyId != diskVol.KMSKeyID { + return createAttempt{}, fmt.Errorf("KMSKeyId: %s != requested %s", existingDisk.KMSKeyId, diskVol.KMSKeyID) + } + } + if diskVol.MultiAttach != (existingDisk.MultiAttach == "Enabled") { + return createAttempt{}, fmt.Errorf("multiAttach: %s != requested %t", existingDisk.MultiAttach, diskVol.MultiAttach) + } + if diskVol.StorageClusterID != existingDisk.StorageClusterId { + return createAttempt{}, fmt.Errorf("storageClusterId: %s != requested %s", existingDisk.StorageClusterId, diskVol.StorageClusterID) + } + if diskVol.ResourceGroupID != existingDisk.ResourceGroupId { + return createAttempt{}, fmt.Errorf("resourceGroupId: %s != requested %s", existingDisk.ResourceGroupId, diskVol.ResourceGroupID) + } + + attempt := createAttempt{} + for _, category := range diskVol.Type { + if existingDisk.Category != string(category) { + continue + } + attempt.Category = category + pl := PerformanceLevel(existingDisk.PerformanceLevel) + attempt.PerformanceLevel = pl + + if len(diskVol.PerformanceLevel) > 0 && len(pl) > 0 { + if !slices.Contains(diskVol.PerformanceLevel, pl) { + return createAttempt{}, fmt.Errorf("performanceLevel: %s not in requested %v", pl, diskVol.PerformanceLevel) + } + } + break + } + if attempt.Category == "" { + return createAttempt{}, fmt.Errorf("category: %s not in requested %v", existingDisk.Category, diskVol.Type) + } + cateDesc := AllCategories[Category(existingDisk.Category)] + + if diskVol.ZoneID != existingDisk.ZoneId { + return createAttempt{}, fmt.Errorf("zoneId: %s != requested %s", existingDisk.ZoneId, diskVol.ZoneID) + } + if cateDesc.ProvisionedIops && existingDisk.ProvisionedIops != diskVol.ProvisionedIops { + return createAttempt{}, fmt.Errorf("provisionedIops: %d != requested %d", existingDisk.ProvisionedIops, diskVol.ProvisionedIops) + } + if cateDesc.Bursting && diskVol.BurstingEnabled != existingDisk.BurstingEnabled { + return createAttempt{}, fmt.Errorf("burstingEnabled: %t != requested %t", existingDisk.BurstingEnabled, diskVol.BurstingEnabled) + } + + existingTags := make(map[string]string, len(existingDisk.Tags.Tag)) + for _, tag := range existingDisk.Tags.Tag { + existingTags[tag.Key] = tag.Value + } + for k, v := range diskVol.DiskTags { + if existingTags[k] != v { + return createAttempt{}, fmt.Errorf("tag %s: %s != requested %s", k, existingTags[k], v) + } + } + + return attempt, nil +} + +func clientToken(name string) string { + b := []byte(name) + if len(b) <= 62 && !slices.ContainsFunc(b, func(r byte) bool { return r > 0x7f }) { + return "n:" + name + } + // CSI name supports Unicode characters at maximum length 128, ECS ClientToken only support 64 ASCII characters, + // So use hash of name as the token. + hash := fnv.New128a() + hash.Write(b) + return "h:" + base64.RawStdEncoding.EncodeToString(hash.Sum(nil)) +} + // Docs say Chinese characters are supported, but the exactly range is not clear. // So we just assume they are not supported. var vaildDiskNameRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9:_-]{1,127}$`) -func createDisk(diskName, snapshotID string, requestGB int, diskVol *diskVolumeArgs, tenantUID string) (string, string, string, error) { +func createDisk(ecsClient cloud.ECSInterface, diskName, snapshotID string, diskVol *diskVolumeArgs, supportedTypes sets.Set[Category]) (string, createAttempt, error) { // 需要配置external-provisioner启动参数--extra-create-metadata=true,然后ACK的external-provisioner才会将PVC的Annotations传过来 - ecsClient, err := getEcsClientByID("", tenantUID) - if err != nil { - return "", "", "", status.Error(codes.Internal, err.Error()) - } - - createDiskRequest := ecs.CreateCreateDiskRequest() + createDiskRequest := buildCreateDiskRequest(diskVol) if vaildDiskNameRegexp.MatchString(diskName) { createDiskRequest.DiskName = diskName } - createDiskRequest.Size = requests.NewInteger(requestGB) - createDiskRequest.RegionId = diskVol.RegionID - createDiskRequest.ZoneId = diskVol.ZoneID - createDiskRequest.Encrypted = requests.NewBoolean(diskVol.Encrypted) - if len(diskVol.ARN) > 0 { - createDiskRequest.Arn = &diskVol.ARN - } - createDiskRequest.ResourceGroupId = diskVol.ResourceGroupID + createDiskRequest.ClientToken = clientToken(diskName) + *createDiskRequest.Tag = append(*createDiskRequest.Tag, ecs.CreateDiskTag{Key: common.VolumeNameTag, Value: diskName}) + if snapshotID != "" { createDiskRequest.SnapshotId = snapshotID } - diskTags := getDefaultDiskTags(diskVol) - diskTags = append(diskTags, ecs.CreateDiskTag{Key: common.VolumeNameTag, Value: diskName}) - createDiskRequest.Tag = &diskTags - if diskVol.MultiAttach { - createDiskRequest.MultiAttach = "Enabled" - } - - if diskVol.Encrypted == true && diskVol.KMSKeyID != "" { - createDiskRequest.KMSKeyId = diskVol.KMSKeyID - } - if diskVol.StorageClusterID != "" { - createDiskRequest.StorageClusterId = diskVol.StorageClusterID - } - diskTypes, diskPLs, err := getDiskType(diskVol) - log.Infof("createDisk: diskName: %s, valid disktype: %v, valid diskpls: %v", diskName, diskTypes, diskPLs) - if err != nil { - return "", "", "", err - } - for _, dType := range diskTypes { - createDiskRequest.ClientToken = fmt.Sprintf("token:%s/%s/%s/%s", diskName, dType, diskVol.RegionID, diskVol.ZoneID) - createDiskRequest.DiskCategory = dType - if dType == DiskESSD { - newReq := generateNewRequest(createDiskRequest) - // when perforamceLevel is not setting, diskPLs is empty. - for _, diskPL := range diskPLs { - log.Infof("createDisk: start to create disk by diskName: %s, valid disktype: %v, pl: %s", diskName, dType, diskPL) - - newReq.ClientToken = fmt.Sprintf("token:%s/%s/%s/%s/%s", diskName, dType, diskVol.RegionID, diskVol.ZoneID, diskPL) - newReq.PerformanceLevel = diskPL - returned, diskId, rerr := request(newReq, ecsClient) - if returned { - if diskId != "" && rerr == nil { - return dType, diskId, diskPL, nil - } - if rerr != nil { - return "", "", "", rerr - } - } - err = rerr - } - if len(diskPLs) != 0 { + messages := []string{} + for _, attempt := range generateCreateAttempts(diskVol) { + if len(supportedTypes) > 0 { + if !supportedTypes.Has(attempt.Category) { + messages = append(messages, fmt.Sprintf("%s: not supported by node %s", attempt, diskVol.NodeSelected)) continue } } - createDiskRequest.PerformanceLevel = "" - if dType == DiskESSDAuto { - newReq := generateNewRequest(createDiskRequest) - createDiskRequest.ClientToken = fmt.Sprintf("token:%s/%s/%s/%s/%d/%t", diskName, dType, diskVol.RegionID, diskVol.ZoneID, diskVol.ProvisionedIops, diskVol.BurstingEnabled) - if diskVol.ProvisionedIops != -1 { - newReq.ProvisionedIops = requests.NewInteger(diskVol.ProvisionedIops) + limit := GetSizeRange(attempt.Category, attempt.PerformanceLevel) + if limit.Min > 0 && diskVol.RequestGB < limit.Min { + messages = append(messages, fmt.Sprintf("%s: requested size %dGiB is less than minimum %dGiB", attempt, diskVol.RequestGB, limit.Min)) + continue + } + retry: + diskID, final, err := createDiskAttempt(createDiskRequest, attempt, ecsClient) + if err != nil { + if final { + return "", attempt, err } - newReq.BurstingEnabled = requests.NewBoolean(diskVol.BurstingEnabled) - returned, diskId, rerr := request(newReq, ecsClient) - if returned { - if diskId != "" && rerr == nil { - return dType, diskId, "", nil + if errors.Is(err, ErrParameterMismatch) { + if createDiskRequest.ClientToken == "" { + // protect us from infinite loop + return "", attempt, fmt.Errorf("unexpected parameter mismatch") + } + existingDisk, err := findDiskByName(diskName, ecsClient) + if err != nil { + return "", attempt, fmt.Errorf("parameter mismatch detected, but fetch existing node failed: %w", err) } - if rerr != nil { - return "", "", "", rerr + if existingDisk == nil { + // No existing disk, retry without client token + createDiskRequest.ClientToken = "" + goto retry } + // Check if the existing disk matches the request + attempt, err := checkExistingDisk(existingDisk, diskVol) + if err != nil { + return "", attempt, fmt.Errorf("%w: %w", ErrParameterMismatch, err) + } + return existingDisk.DiskId, attempt, nil } - err = rerr + messages = append(messages, fmt.Sprintf("%s: %v", attempt, err)) continue } - returned, diskId, rerr := request(createDiskRequest, ecsClient) - if returned { - if diskId != "" && rerr == nil { - return dType, diskId, "", nil - } - if rerr != nil { - return "", "", "", rerr - } - } - err = rerr + return diskID, attempt, nil } - return "", "", "", status.Errorf(codes.Internal, "createDisk: err: %v, the zone:[%s] is not support specific disk type, please change the request disktype: %s or disk pl: %s", err, diskVol.ZoneID, diskTypes, diskPLs) + return "", createAttempt{}, status.Errorf(codes.Internal, "all attempts failed: %s", strings.Join(messages, "; ")) } -// reuse rpcrequest in ecs sdk is forbidden, because parameters can't be reassigned with empty string.(ecs sdk bug) -func generateNewRequest(oldReq *ecs.CreateDiskRequest) *ecs.CreateDiskRequest { - createDiskRequest := ecs.CreateCreateDiskRequest() - - createDiskRequest.DiskCategory = oldReq.DiskCategory - createDiskRequest.DiskName = oldReq.DiskName - createDiskRequest.Size = oldReq.Size - createDiskRequest.RegionId = oldReq.RegionId - createDiskRequest.ZoneId = oldReq.ZoneId - createDiskRequest.Encrypted = oldReq.Encrypted - createDiskRequest.Arn = oldReq.Arn - createDiskRequest.ResourceGroupId = oldReq.ResourceGroupId - createDiskRequest.SnapshotId = oldReq.SnapshotId - createDiskRequest.Tag = oldReq.Tag - createDiskRequest.MultiAttach = oldReq.MultiAttach - createDiskRequest.KMSKeyId = oldReq.KMSKeyId - createDiskRequest.StorageClusterId = oldReq.StorageClusterId - return createDiskRequest +func buildCreateDiskRequest(diskVol *diskVolumeArgs) *ecs.CreateDiskRequest { + req := ecs.CreateCreateDiskRequest() + + req.Size = requests.NewInteger64(diskVol.RequestGB) + req.RegionId = diskVol.RegionID + req.ZoneId = diskVol.ZoneID + req.Encrypted = requests.NewBoolean(diskVol.Encrypted) + if len(diskVol.ARN) > 0 { + req.Arn = &diskVol.ARN + } + req.ResourceGroupId = diskVol.ResourceGroupID + diskTags := getDefaultDiskTags(diskVol) + req.Tag = &diskTags + + if diskVol.MultiAttach { + req.MultiAttach = "Enabled" + } + + if diskVol.Encrypted && diskVol.KMSKeyID != "" { + req.KMSKeyId = diskVol.KMSKeyID + } + req.StorageClusterId = diskVol.StorageClusterID + if diskVol.ProvisionedIops > 0 { + req.ProvisionedIops = requests.NewInteger64(diskVol.ProvisionedIops) + } + req.BurstingEnabled = requests.NewBoolean(diskVol.BurstingEnabled) + return req } -func request(createDiskRequest *ecs.CreateDiskRequest, ecsClient *ecs.Client) (returned bool, diskId string, err error) { - cata := strings.Trim(fmt.Sprintf("%s.%s", createDiskRequest.DiskCategory, createDiskRequest.PerformanceLevel), ".") - log.Infof("request: Create Disk for volume: %s with cata: %s", createDiskRequest.DiskName, cata) - if minCap, ok := DiskCapacityMapping[cata]; ok { - if rValue, err := createDiskRequest.Size.GetValue(); err == nil { - if rValue < minCap { - return false, "", fmt.Errorf("request: to request %s type disk you needs at least %dGB size which the provided size %dGB does not meet the needs, please resize the size up.", cata, minCap, rValue) - } - } +func finalizeCreateDiskRequest(template *ecs.CreateDiskRequest, attempt createAttempt) *ecs.CreateDiskRequest { + req := *template + req.DiskCategory = string(attempt.Category) + req.PerformanceLevel = string(attempt.PerformanceLevel) + + cateDesc := AllCategories[attempt.Category] + if !cateDesc.Bursting { + req.BurstingEnabled = "" } + if !cateDesc.ProvisionedIops { + req.ProvisionedIops = "" + } + + tmp := ecs.CreateCreateDiskRequest() + req.RpcRequest = tmp.RpcRequest + // We want to keep each request independent. But once sent, the req.QueryParams is modified by the SDK. + // to avoid modifying template, create a new RpcRequest + // for private cloud if ascmContext := os.Getenv("X-ACSPROXY-ASCM-CONTEXT"); ascmContext != "" { - createDiskRequest.GetHeaders()["x-acsproxy-ascm-context"] = ascmContext + req.GetHeaders()["x-acsproxy-ascm-context"] = ascmContext } - log.Infof("request: request content: %++v", *createDiskRequest) - volumeRes, err := ecsClient.CreateDisk(createDiskRequest) + return &req +} + +var ErrParameterMismatch = errors.New("parameter mismatch") + +func createDiskAttempt(req *ecs.CreateDiskRequest, attempt createAttempt, ecsClient cloud.ECSInterface) (diskId string, final bool, err error) { + req = finalizeCreateDiskRequest(req, attempt) + log.Infof("request: request content: %++v", req) + + volumeRes, err := ecsClient.CreateDisk(req) if err == nil { log.Infof("request: diskId: %s, reqId: %s", volumeRes.DiskId, volumeRes.RequestId) - return true, volumeRes.DiskId, nil + return volumeRes.DiskId, true, nil } var aliErr *alicloudErr.ServerError if errors.As(err, &aliErr) { + log.Infof("request: Create Disk for volume %s failed: %v", req.DiskName, err) if strings.HasPrefix(aliErr.ErrorCode(), DiskNotAvailable) || strings.Contains(aliErr.Message(), DiskNotAvailableVer2) { - log.Infof("request: Create Disk for volume %s with diskCatalog: %s is not supported in zone: %s", createDiskRequest.DiskName, createDiskRequest.DiskCategory, createDiskRequest.ZoneId) - return false, "", err + return "", false, fmt.Errorf("not supported in zone: %s", req.ZoneId) } else if aliErr.ErrorCode() == DiskSizeNotAvailable1 || aliErr.ErrorCode() == DiskSizeNotAvailable2 { // although we have checked the size above, but these limits are subject to change, so we may still encounter this error - log.Infof("request: Create Disk for volume %s with diskCatalog: %s has invalid disk size: %s", createDiskRequest.DiskName, createDiskRequest.DiskCategory, createDiskRequest.Size) - return false, "", err - } else if aliErr.ErrorCode() == DiskPerformanceLevelNotMatch && createDiskRequest.DiskCategory == DiskESSD { - log.Infof("request: Create Disk for volume %s with diskCatalog: %s , pl: %s has invalid disk size: %s", createDiskRequest.DiskName, createDiskRequest.DiskCategory, createDiskRequest.PerformanceLevel, createDiskRequest.Size) - return false, "", err - } else if aliErr.ErrorCode() == DiskInvalidPL && createDiskRequest.DiskCategory == DiskESSD { + return "", false, fmt.Errorf("invalid disk size: %s", req.Size) + } else if aliErr.ErrorCode() == DiskPerformanceLevelNotMatch && attempt.Category == DiskESSD { + return "", false, fmt.Errorf("invalid disk size: %s", req.Size) + } else if aliErr.ErrorCode() == DiskInvalidPL && attempt.Category == DiskESSD { // observed in cn-north-2-gov-1 region, PL0 is not supported - log.Infof("request: Create Disk for volume %s with diskCatalog: %s , pl: %s unsupported", createDiskRequest.DiskName, createDiskRequest.DiskCategory, createDiskRequest.PerformanceLevel) - return false, "", err - } else if aliErr.ErrorCode() == DiskIopsLimitExceeded && createDiskRequest.DiskCategory == DiskESSDAuto { - log.Infof("request: Create Disk for volume %s with diskCatalog: %s , provisioned iops %s has exceeded limit", createDiskRequest.DiskName, createDiskRequest.DiskCategory, createDiskRequest.ProvisionedIops) - return false, "", err + return "", false, fmt.Errorf("performance level %s unsupported", req.PerformanceLevel) + } else if aliErr.ErrorCode() == DiskIopsLimitExceeded && attempt.Category == DiskESSDAuto { + return "", false, fmt.Errorf("provisioned iops %s has exceeded limit", req.ProvisionedIops) + } else if aliErr.ErrorCode() == IdempotentParameterMismatch { + return "", false, ErrParameterMismatch } } - log.Errorf("request: create disk for volume %s with type: %s err: %v", createDiskRequest.DiskName, createDiskRequest.DiskCategory, err) + log.Errorf("request: create disk for volume %s with type: %s err: %v", req.DiskName, attempt, err) newErrMsg := utils.FindSuggestionByErrorMessage(err.Error(), utils.DiskProvision) - return true, "", fmt.Errorf("%s: %w", newErrMsg, err) + return "", true, fmt.Errorf("%s: %w", newErrMsg, err) } -func getDiskType(diskVol *diskVolumeArgs) ([]string, []string, error) { - nodeSupportDiskType := []string{} - if diskVol.NodeSelected != "" { - client := GlobalConfigVar.ClientSet - nodeInfo, err := client.CoreV1().Nodes().Get(context.Background(), diskVol.NodeSelected, metav1.GetOptions{}) - if err != nil { - log.Infof("getDiskType: failed to get node labels: %v", err) - if apierrors.IsNotFound(err) { - return nil, nil, status.Errorf(codes.ResourceExhausted, "CreateVolume:: get node info by name: %s failed with err: %v, start rescheduling", diskVol.NodeSelected, err) - } - return nil, nil, status.Errorf(codes.InvalidArgument, "CreateVolume:: get node info by name: %s failed with err: %v", diskVol.NodeSelected, err) - } - re := regexp.MustCompile(`node.csi.alibabacloud.com/disktype.(.*)`) - for key := range nodeInfo.Labels { - if result := re.FindStringSubmatch(key); len(result) != 0 { - nodeSupportDiskType = append(nodeSupportDiskType, result[1]) +type createAttempt struct { + Category Category + PerformanceLevel PerformanceLevel +} + +func (a createAttempt) String() string { + if a.PerformanceLevel == "" { + return string(a.Category) + } + return fmt.Sprintf("%s.%s", a.Category, a.PerformanceLevel) +} + +func generateCreateAttempts(diskVol *diskVolumeArgs) (a []createAttempt) { + for _, c := range diskVol.Type { + desc := AllCategories[c] + if len(desc.PerformanceLevel) == 0 || len(diskVol.PerformanceLevel) == 0 { + a = append(a, createAttempt{Category: c}) + } else { + for _, pl := range diskVol.PerformanceLevel { + if _, ok := desc.PerformanceLevel[pl]; ok { + a = append(a, createAttempt{Category: c, PerformanceLevel: pl}) + } } } - log.Infof("CreateVolume:: node support disk types: %v, nodeSelected: %v", nodeSupportDiskType, diskVol.NodeSelected) } + return +} - provisionDiskTypes := []string{} - allTypes := deleteEmpty(diskVol.Type) - if len(nodeSupportDiskType) != 0 { - provisionDiskTypes = intersect(nodeSupportDiskType, allTypes) - if len(provisionDiskTypes) == 0 { - log.Errorf("CreateVolume:: node(%s) support type: [%v] is incompatible with provision disk type: [%s]", diskVol.NodeSelected, nodeSupportDiskType, allTypes) - return nil, nil, status.Errorf(codes.ResourceExhausted, "CreateVolume:: node support type: [%v] is incompatible with provision disk type: [%s]", nodeSupportDiskType, allTypes) +func getSupportedDiskTypes(node string) (sets.Set[Category], error) { + types := sets.New[Category]() + client := GlobalConfigVar.ClientSet + nodeInfo, err := client.CoreV1().Nodes().Get(context.Background(), node, metav1.GetOptions{}) + if err != nil { + log.Infof("getDiskType: failed to get node labels: %v", err) + if apierrors.IsNotFound(err) { + return nil, status.Errorf(codes.ResourceExhausted, "CreateVolume:: get node info by name: %s failed with err: %v, start rescheduling", node, err) + } + return nil, status.Errorf(codes.InvalidArgument, "CreateVolume:: get node info by name: %s failed with err: %v", node, err) + } + re := regexp.MustCompile(`node.csi.alibabacloud.com/disktype.(.*)`) + for key := range nodeInfo.Labels { + if result := re.FindStringSubmatch(key); len(result) != 0 { + types.Insert(Category(result[1])) } - } else { - provisionDiskTypes = allTypes } - provisionPerformanceLevel := diskVol.PerformanceLevel - return provisionDiskTypes, provisionPerformanceLevel, nil + log.Infof("CreateVolume:: node support disk types: %v, nodeSelected: %v", types, node) + return types, nil } func getDefaultDiskTags(diskVol *diskVolumeArgs) []ecs.CreateDiskTag { diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index ab187b1f0..70f55d6fa 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -6,10 +6,12 @@ import ( "testing" alicloudErr "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors" + "github.com/aliyun/alibaba-cloud-sdk-go/sdk/requests" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" gomock "github.com/golang/mock/gomock" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" ) var ( @@ -182,3 +184,304 @@ func TestListSnapshotsInvalidToken(t *testing.T) { _, _, err := listSnapshots(client, "test-disk", "my-cluster", "invalid-token", 0) assert.Error(t, err) } + +func TestClientToken(t *testing.T) { + // we should keep the token stable across versions + assert.Equal(t, "n:disk-dcd6fdde-8c1e-45eb-8ec7-786a8b2e0b61", clientToken("disk-dcd6fdde-8c1e-45eb-8ec7-786a8b2e0b61")) + // only ASCII characters are allowed + assert.Equal(t, "h:LGH7vCPQbR31I47I1eCW7g", clientToken("disk-磁盘名称-1")) + + // the length should be <= 64 + assert.Equal(t, "n:01234567890123456789012345678901234567890123456789012345678901", + clientToken("01234567890123456789012345678901234567890123456789012345678901")) + assert.Equal(t, "h:NDeYXVDChDCom5xYgHLVQA", + clientToken("012345678901234567890123456789012345678901234567890123456789012")) +} + +func BenchmarkClientToken(b *testing.B) { + for i := 0; i < b.N; i++ { + clientToken("disk-dcd6fdde-8c1e-45eb-8ec7-786a8b2e0b61") + } +} + +func TestBuildCreateDiskRequest(t *testing.T) { + args := &diskVolumeArgs{ + ZoneID: "cn-hangzhou", + } + req := buildCreateDiskRequest(args) + assert.Equal(t, "cn-hangzhou", req.ZoneId) + + req2 := finalizeCreateDiskRequest(req, createAttempt{ + Category: DiskESSD, + PerformanceLevel: PERFORMANCE_LEVEL0, + }) + assert.Equal(t, "cloud_essd", req2.DiskCategory) + assert.Equal(t, "PL0", req2.PerformanceLevel) + // fields is copied + assert.Equal(t, "cn-hangzhou", req2.ZoneId) + + // send req2 should not affect req + requests.InitParams(req2) + assert.Greater(t, len(req2.QueryParams), len(req.QueryParams)) +} + +func TestGenerateAttempts(t *testing.T) { + cases := []struct { + name string + args *diskVolumeArgs + attempts []createAttempt + }{ + { + name: "no PL", + args: &diskVolumeArgs{ + Type: []Category{DiskESSD, DiskESSDAuto}, + }, + attempts: []createAttempt{ + {Category: DiskESSD}, + {Category: DiskESSDAuto}, + }, + }, { + name: "with PL", + args: &diskVolumeArgs{ + Type: []Category{DiskESSDEntry, DiskESSD, DiskESSDAuto}, + PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0, PERFORMANCE_LEVEL1}, + }, + attempts: []createAttempt{ + {Category: DiskESSDEntry}, + {Category: DiskESSD, PerformanceLevel: PERFORMANCE_LEVEL0}, + {Category: DiskESSD, PerformanceLevel: PERFORMANCE_LEVEL1}, + {Category: DiskESSDAuto}, + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + attempts := generateCreateAttempts(c.args) + assert.Equal(t, c.attempts, attempts) + }) + } +} + +func TestCheckExistingDisk(t *testing.T) { + disk := &ecs.Disk{ + Size: 20, + Category: "cloud_essd", + PerformanceLevel: "PL0", + Tags: ecs.TagsInDescribeDisks{ + Tag: []ecs.Tag{ + {Key: "k1", Value: "v1"}, + }, + }, + } + cases := []struct { + name string + args *diskVolumeArgs + match bool + }{ + { + name: "match", + args: &diskVolumeArgs{RequestGB: 20, Type: []Category{DiskESSD, DiskESSDAuto}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0}}, + match: true, + }, { + name: "mismatch category", + args: &diskVolumeArgs{RequestGB: 20, Type: []Category{DiskESSDAuto}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0}}, + }, { + name: "mismatch PL", + args: &diskVolumeArgs{RequestGB: 20, Type: []Category{DiskESSD}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL1}}, + }, { + name: "mismatch MultiAttach", + args: &diskVolumeArgs{ + RequestGB: 20, Type: []Category{DiskESSD}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0}, + MultiAttach: true, + }, + }, { + name: "mismatch tag key", + args: &diskVolumeArgs{ + RequestGB: 20, Type: []Category{DiskESSD}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0}, + DiskTags: map[string]string{"k2": "v1"}, + }, + }, { + name: "mismatch tag value", + args: &diskVolumeArgs{ + RequestGB: 20, Type: []Category{DiskESSD}, PerformanceLevel: []PerformanceLevel{PERFORMANCE_LEVEL0}, + DiskTags: map[string]string{"k1": "v2"}, + }, + }, { + name: "match no PL requested", + args: &diskVolumeArgs{RequestGB: 20, Type: []Category{DiskESSD}}, + match: true, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + createAttempt, err := checkExistingDisk(disk, c.args) + assert.Equal(t, c.match, err == nil) + if c.match { + assert.Equal(t, disk.Category, string(createAttempt.Category)) + assert.Equal(t, disk.PerformanceLevel, string(createAttempt.PerformanceLevel)) + } + }) + } +} + +// Cases that only hit the server at most once +func TestCreateDisk_Basic(t *testing.T) { + cases := []struct { + name string + supports sets.Set[Category] + args *diskVolumeArgs + err bool + serverFail bool + }{ + { + name: "success", + supports: sets.New(DiskESSD), + args: &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}, + }, { + name: "success - fallback", + supports: sets.New(DiskESSD), + args: &diskVolumeArgs{Type: []Category{DiskSSD, DiskESSD}, RequestGB: 20}, + }, { + name: "success - empty supports", + args: &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}, + }, { + name: "unsupported", + supports: sets.New(DiskSSD), + args: &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}, + err: true, + }, { + name: "too small", + args: &diskVolumeArgs{Type: []Category{DiskSSD}, RequestGB: 1}, + err: true, + }, { + name: "server fail", + args: &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20}, + err: true, + serverFail: true, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + client := cloud.NewMockECSInterface(ctrl) + + if !c.err { + client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ + DiskId: "d-123", + }, nil) + } + if c.serverFail { + client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "AnyOtherErrors"}`, "")) + } + + diskID, attempt, err := createDisk(client, "disk-name", "", c.args, c.supports) + if c.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, "d-123", diskID) + assert.Equal(t, DiskESSD, attempt.Category) + assert.Empty(t, attempt.PerformanceLevel) + } + }) + } +} + +func TestCreateDisk_ServerFailFallback(t *testing.T) { + ctrl := gomock.NewController(t) + client := cloud.NewMockECSInterface(ctrl) + + client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "InvalidDataDiskSize.ValueNotSupported"}`, "")) + client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ + DiskId: "d-123", + }, nil) + + args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} + diskID, attempt, err := createDisk(client, "disk-name", "", args, nil) + assert.NoError(t, err) + assert.Equal(t, "d-123", diskID) + assert.Equal(t, DiskESSDAuto, attempt.Category) + assert.Empty(t, attempt.PerformanceLevel) +} + +func TestCreateDisk_ParameterMismatch(t *testing.T) { + cases := []struct { + name string + existing []ecs.Disk + err bool + }{ + { + name: "retry", + }, { + name: "reuse", + existing: []ecs.Disk{{ + DiskId: "d-124", + Category: "cloud_auto", + Size: 20, + }}, + }, { + name: "mismatch", + existing: []ecs.Disk{{ + DiskId: "d-124", + Category: "cloud_essd_entry", + Size: 20, + }}, + err: true, + }, { + name: "multiple existing", + existing: []ecs.Disk{{}, {}}, + err: true, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + client := cloud.NewMockECSInterface(ctrl) + + r1 := client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")) + r2 := client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ + Disks: ecs.DisksInDescribeDisks{ + Disk: c.existing, + }, + }, nil).After(r1) + if c.existing == nil { + client.EXPECT().CreateDisk(gomock.Any()).Return(&ecs.CreateDiskResponse{ + DiskId: "d-123", + }, nil).After(r2) + } + + args := &diskVolumeArgs{Type: []Category{DiskESSD, DiskESSDAuto}, RequestGB: 20} + diskID, attempt, err := createDisk(client, "disk-name", "", args, nil) + if c.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if c.existing == nil { + assert.Equal(t, "d-123", diskID) + assert.Equal(t, DiskESSD, attempt.Category) + assert.Empty(t, attempt.PerformanceLevel) + } else { + d := c.existing[0] + assert.Equal(t, d.DiskId, diskID) + assert.Equal(t, Category(d.Category), attempt.Category) + } + } + }) + } +} + +func TestCreateDisk_NoInfinitLoop(t *testing.T) { + ctrl := gomock.NewController(t) + client := cloud.NewMockECSInterface(ctrl) + + client.EXPECT().CreateDisk(gomock.Any()).Return(nil, alicloudErr.NewServerError(400, `{"Code": "IdempotentParameterMismatch"}`, "")).Times(2) + client.EXPECT().DescribeDisks(gomock.Any()).Return(&ecs.DescribeDisksResponse{ + Disks: ecs.DisksInDescribeDisks{ + Disk: []ecs.Disk{}, + }, + }, nil) + + args := &diskVolumeArgs{Type: []Category{DiskESSD}, RequestGB: 20} + _, _, err := createDisk(client, "disk-name", "", args, nil) + assert.Error(t, err) +} diff --git a/pkg/disk/constants.go b/pkg/disk/constants.go index 6c6194f24..6391459b7 100644 --- a/pkg/disk/constants.go +++ b/pkg/disk/constants.go @@ -1,9 +1,5 @@ package disk -import ( - "fmt" -) - const ( // ESSD_PERFORMANCE_LEVEL is storage class @@ -99,26 +95,6 @@ const ( // DiskHighAvail tag DiskHighAvail = "available" - // DiskCommon common disk type - DiskCommon = "cloud" - // DiskEfficiency efficiency disk type - DiskEfficiency = "cloud_efficiency" - // DiskSSD ssd disk type - DiskSSD = "cloud_ssd" - // DiskESSD essd disk type - DiskESSD = "cloud_essd" - // DiskESSDAuto essd autopl disk type - DiskESSDAuto = "cloud_auto" - // DiskESSDEntry essd entry disk type - DiskESSDEntry = "cloud_essd_entry" - // DiskHighPerformance - DiskPPerf = "cloud_pperf" - // DiskStandPerformance - DiskSPerf = "cloud_sperf" - // DiskSharedSSD shared sdd disk type - DiskSharedSSD = "san_ssd" - // DiskSharedEfficiency shared efficiency disk type - DiskSharedEfficiency = "san_efficiency" // MBSIZE tag MBSIZE = 1024 * 1024 // GBSIZE tag @@ -164,30 +140,6 @@ const ( EXT3_FSTYPE = "ext3" XFS_FSTYPE = "xfs" - DISK_PERFORMANCE_LEVEL0 = "PL0" - DISK_PERFORMANCE_LEVEL1 = "PL1" - DISK_PERFORMANCE_LEVEL2 = "PL2" - DISK_PERFORMANCE_LEVEL3 = "PL3" - SNAPSHOT_MAX_RETENTION_DAYS = 65536 SNAPSHOT_MIN_RETENTION_DAYS = 1 - - DISK_CLOUD_EFFICIENT_MIN_CAPACITY = 20 - DISK_CLOUD_SSD_MIN_CAPACITY = 20 - DISK_CLOUD_ESSD_PL0_MIN_CAPACITY = 1 - DISK_CLOUD_ESSD_PL1_MIN_CAPACITY = 20 - DISK_CLOUD_ESSD_PL2_MIN_CAPACITY = 461 - DISK_CLOUD_ESSD_PL3_MIN_CAPACITY = 1261 - DISK_CLOUD_ESSD_AUTO_PL_MIN_CAPACITY = 1 ) - -var DiskCapacityMapping = map[string]int{ - DiskEfficiency: DISK_CLOUD_EFFICIENT_MIN_CAPACITY, - DiskSSD: DISK_CLOUD_SSD_MIN_CAPACITY, - fmt.Sprintf("%s.%s", DiskESSD, DISK_PERFORMANCE_LEVEL0): DISK_CLOUD_ESSD_PL0_MIN_CAPACITY, - fmt.Sprintf("%s.%s", DiskESSD, DISK_PERFORMANCE_LEVEL1): DISK_CLOUD_ESSD_PL1_MIN_CAPACITY, - fmt.Sprintf("%s.%s", DiskESSD, DISK_PERFORMANCE_LEVEL2): DISK_CLOUD_ESSD_PL2_MIN_CAPACITY, - fmt.Sprintf("%s.%s", DiskESSD, DISK_PERFORMANCE_LEVEL3): DISK_CLOUD_ESSD_PL3_MIN_CAPACITY, - DiskESSDAuto: DISK_CLOUD_ESSD_AUTO_PL_MIN_CAPACITY, - DiskESSD: DISK_CLOUD_ESSD_PL1_MIN_CAPACITY, -} diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index 1963de596..11278eb24 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -48,6 +48,7 @@ import ( crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/tools/record" ) @@ -60,24 +61,24 @@ type controllerServer struct { // Alicloud disk parameters type diskVolumeArgs struct { - Type []string - RegionID string - ZoneID string - FsType string - ReadOnly bool - MultiAttach bool - Encrypted bool - KMSKeyID string - PerformanceLevel []string - ResourceGroupID string - StorageClusterID string - DiskTags map[string]string - NodeSelected string - DelAutoSnap string - ARN []ecs.CreateDiskArn - VolumeSizeAutoAvailable bool - ProvisionedIops int - BurstingEnabled bool + Type []Category + RegionID string + ZoneID string + FsType string + ReadOnly bool + MultiAttach bool + Encrypted bool + KMSKeyID string + PerformanceLevel []PerformanceLevel + ResourceGroupID string + StorageClusterID string + DiskTags map[string]string + NodeSelected string + DelAutoSnap string + ARN []ecs.CreateDiskArn + ProvisionedIops int64 + BurstingEnabled bool + RequestGB int64 } var veasp = struct { @@ -123,16 +124,9 @@ func NewControllerServer(d *csicommon.CSIDriver, client *crd.Clientset) csi.Cont // the map of req.Name and csi.Snapshot var createdSnapshotMap = map[string]*csi.Snapshot{} -// the map of req.Name and csi.Volume -var createdVolumeMap = map[string]*csi.Volume{} - // the map of multizone and index var storageClassZonePos = map[string]int{} -// the map of diskId and pvName -// diskId and pvName is not same under csi plugin -var diskIDPVMap = map[string]string{} - // SnapshotRequestMap snapshot request limit var SnapshotRequestMap = map[string]int64{} @@ -148,10 +142,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol log.Errorf("CreateVolume: driver not support Create volume: %v", err) return nil, err } - if value, ok := createdVolumeMap[req.Name]; ok { - log.Infof("CreateVolume: volume already be created pvName: %s, VolumeId: %s, volumeContext: %v", req.Name, value.VolumeId, value.VolumeContext) - return &csi.CreateVolumeResponse{Volume: value}, nil - } snapshotID := "" volumeSource := req.GetVolumeContentSource() @@ -191,26 +181,29 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return &csi.CreateVolumeResponse{Volume: csiVolume}, nil } - if req.GetCapacityRange() == nil { - log.Errorf("CreateVolume: error Capacity from input: %s", req.Name) - return nil, status.Errorf(codes.InvalidArgument, "CreateVolume: error Capacity from input: %v", req.Name) + sharedDisk := len(diskVol.Type) == 1 && (diskVol.Type[0] == DiskSharedEfficiency || diskVol.Type[0] == DiskSharedSSD) + + var supportedTypes sets.Set[Category] + if diskVol.NodeSelected != "" { + supportedTypes, err = getSupportedDiskTypes(diskVol.NodeSelected) + if err != nil { + return nil, err + } } - volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) - requestGB := int((volSizeBytes + 1024*1024*1024 - 1) / (1024 * 1024 * 1024)) - if diskVol.VolumeSizeAutoAvailable && requestGB < MinimumDiskSizeInGB { - log.Infof("CreateVolume: volume size was less than allowed limit. Setting request Size to %vGB. volumeSizeAutoAvailable is set.", MinimumDiskSizeInGB) - requestGB = MinimumDiskSizeInGB - volSizeBytes = MinimumDiskSizeInBytes + + ecsClient, err := getEcsClientByID("", req.Parameters[TenantUserUID]) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) } - sharedDisk := len(diskVol.Type) == 1 && (diskVol.Type[0] == DiskSharedEfficiency || diskVol.Type[0] == DiskSharedSSD) - diskType, diskID, diskPL, err := createDisk(req.GetName(), snapshotID, requestGB, diskVol, req.Parameters[TenantUserUID]) + diskID, attempt, err := createDisk(ecsClient, req.GetName(), snapshotID, diskVol, supportedTypes) if err != nil { + if errors.Is(err, ErrParameterMismatch) { + return nil, status.Errorf(codes.AlreadyExists, "volume %s already created but %v", req.Name, err) + } var aliErr *alicloudErr.ServerError if errors.As(err, &aliErr) { switch aliErr.ErrorCode() { - case IdempotentParameterMismatch: - return nil, status.Errorf(codes.AlreadyExists, "volume %s already created with different parameters", req.Name) case SnapshotNotFound: return nil, status.Errorf(codes.NotFound, "snapshot %s not found", snapshotID) } @@ -225,12 +218,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol if sharedDisk { volumeContext[SharedEnable] = "enable" } - if diskType != "" { - volumeContext["type"] = diskType - } - log.Infof("CreateVolume: volume: %s created diskpl: %s", req.GetName(), diskPL) - if diskPL != "" { - volumeContext[ESSD_PERFORMANCE_LEVEL] = diskPL + volumeContext["type"] = string(attempt.Category) + if attempt.PerformanceLevel != "" { + volumeContext[ESSD_PERFORMANCE_LEVEL] = string(attempt.PerformanceLevel) } if tenantUserUID := req.Parameters[TenantUserUID]; tenantUserUID != "" { @@ -238,7 +228,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } volumeContext = updateVolumeContext(volumeContext) - log.Infof("CreateVolume: Successfully created Disk %s: id[%s], zone[%s], disktype[%s], size[%d], snapshotID[%s]", req.GetName(), diskID, diskVol.ZoneID, diskType, requestGB, snapshotID) + log.Infof("CreateVolume: Successfully created Disk %s: id[%s], zone[%s], disktype[%s], snapshotID[%s]", req.GetName(), diskID, diskVol.ZoneID, attempt, snapshotID) // Set VolumeContentSource var src *csi.VolumeContentSource @@ -252,10 +242,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } } - tmpVol := volumeCreate(diskType, diskID, volSizeBytes, volumeContext, diskVol.ZoneID, src) + tmpVol := volumeCreate(attempt, diskID, utils.Gi2Bytes(int64(diskVol.RequestGB)), volumeContext, diskVol.ZoneID, src) - diskIDPVMap[diskID] = req.Name - createdVolumeMap[req.Name] = tmpVol return &csi.CreateVolumeResponse{Volume: tmpVol}, nil } @@ -280,11 +268,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol log.Error(errMsg) return nil, status.Error(codes.Internal, errMsg) } else if disk == nil { - // TODO Optimize concurrent access problems - if value, ok := diskIDPVMap[req.VolumeId]; ok { - delete(createdVolumeMap, value) - delete(diskIDPVMap, req.VolumeId) - } log.Infof("DeleteVolume: disk(%s) already deleted", req.VolumeId) return &csi.DeleteVolumeResponse{}, nil } @@ -330,10 +313,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Errorf(codes.Internal, errMsg) } - if value, ok := diskIDPVMap[req.VolumeId]; ok { - delete(createdVolumeMap, value) - delete(diskIDPVMap, req.VolumeId) - } delVolumeSnap.Delete(req.GetVolumeId()) return &csi.DeleteVolumeResponse{}, nil } @@ -661,7 +640,7 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS // } // if disk type is not essd and IA set disable - if params.InstantAccess && disks[0].Category != DiskESSD && disks[0].Category != DiskESSDAuto { + if params.InstantAccess && !AllCategories[Category(disks[0].Category)].InstantAccessSnapshot { log.Warnf("CreateSnapshot: Snapshot(%s) set as not IA type, because disk Category %s", req.Name, disks[0].Category) params.InstantAccess = false } @@ -707,8 +686,8 @@ func snapshotBeforeDelete(volumeID string, ecsClient *ecs.Client) error { if err != nil { return err } - if disk.Category != DiskESSD && disk.Category != DiskESSDAuto { - log.Infof("snapshotBeforeDelete: only supports essd type which current disk.Catagory is: %s", disk.Category) + if !AllCategories[Category(disk.Category)].InstantAccessSnapshot { + log.Infof("snapshotBeforeDelete: Instant Access snapshot required, but current disk.Catagory is: %s", disk.Category) return nil } @@ -1036,7 +1015,7 @@ func updateVolumeExpandAutoSnapshotID(pvc *v1.PersistentVolumeClaim, snapshotID, // autoSnapshot is used in volume expanding of ESSD, // returns if the volume expanding should be continued func (cs *controllerServer) autoSnapshot(ctx context.Context, disk *ecs.Disk) (bool, *csi.Snapshot, error) { - if disk.Category != DiskESSD && disk.Category != DiskESSDAuto { + if !AllCategories[Category(disk.Category)].InstantAccessSnapshot { return true, nil, nil } pv, pvc, err := getPvPvcFromDiskId(disk.DiskId) diff --git a/pkg/disk/utils.go b/pkg/disk/utils.go index cfb53e5fc..d62f65643 100644 --- a/pkg/disk/utils.go +++ b/pkg/disk/utils.go @@ -61,13 +61,6 @@ import ( var ( // KubernetesAlicloudIdentity is the system identity for ecs client request KubernetesAlicloudIdentity = fmt.Sprintf("Kubernetes.Alicloud/CsiProvision.Disk-%s", version.VERSION) - - // All available disk types - AvailableDiskTypes = sets.NewString(DiskCommon, DiskESSD, DiskEfficiency, DiskSSD, DiskSharedSSD, DiskSharedEfficiency, DiskPPerf, DiskSPerf, DiskESSDAuto, DiskESSDEntry) - // CustomDiskTypes ... - CustomDiskTypes = sets.NewString(DiskESSD, DiskSSD, DiskEfficiency, DiskPPerf, DiskSPerf, DiskESSDAuto, DiskESSDEntry) - // Performance Level for ESSD - CustomDiskPerfermance = sets.NewString(DISK_PERFORMANCE_LEVEL0, DISK_PERFORMANCE_LEVEL1, DISK_PERFORMANCE_LEVEL2, DISK_PERFORMANCE_LEVEL3) ) const DISK_TAG_PREFIX = "diskTags/" @@ -731,18 +724,6 @@ func getDiskVolumeOptions(req *csi.CreateVolumeRequest) (*diskVolumeArgs, error) return nil, fmt.Errorf("performanceLevel is necessary when storageClusterID: '%s' specified", diskVolArgs.StorageClusterID) } } - // volumeSizeAutoAvailable - value, ok = volOptions["volumeSizeAutoAvailable"] - if !ok { - diskVolArgs.VolumeSizeAutoAvailable = false - } else { - value = strings.ToLower(value) - if value == "yes" || value == "true" || value == "1" { - diskVolArgs.VolumeSizeAutoAvailable = true - } else { - diskVolArgs.VolumeSizeAutoAvailable = false - } - } // volumeExpandAutoSnapshot, default closed if value, ok = volOptions[VOLUME_EXPAND_AUTO_SNAPSHOT_OP_KEY]; ok { @@ -761,10 +742,9 @@ func getDiskVolumeOptions(req *csi.CreateVolumeRequest) (*diskVolumeArgs, error) } } - diskVolArgs.ProvisionedIops = -1 value, ok = volOptions[PROVISIONED_IOPS_KEY] if ok { - iValue, err := strconv.Atoi(value) + iValue, err := strconv.ParseInt(value, 10, 64) if err != nil || iValue < 0 { return nil, fmt.Errorf("getDiskVolumeOptions: parameters provisionedIops[%s] is illegal", value) } @@ -780,29 +760,36 @@ func getDiskVolumeOptions(req *csi.CreateVolumeRequest) (*diskVolumeArgs, error) } } + if req.GetCapacityRange() == nil { + return nil, fmt.Errorf("capacity range is required") + } + volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) + requestGB := (volSizeBytes + 1024*1024*1024 - 1) / (1024 * 1024 * 1024) + if requestGB < MinimumDiskSizeInGB { + switch strings.ToLower(volOptions["volumeSizeAutoAvailable"]) { + case "yes", "true", "1": + log.Infof("CreateVolume: volume size was less than allowed limit. Setting request Size to %vGB. volumeSizeAutoAvailable is set.", MinimumDiskSizeInGB) + requestGB = MinimumDiskSizeInGB + volSizeBytes = MinimumDiskSizeInBytes + } + } + diskVolArgs.RequestGB = requestGB + return diskVolArgs, nil } -func validateDiskType(opts map[string]string) (diskType []string, err error) { +func validateDiskType(opts map[string]string) (diskType []Category, err error) { if value, ok := opts["type"]; !ok || (ok && value == DiskHighAvail) { - diskType = []string{DiskSSD, DiskEfficiency} + diskType = []Category{DiskSSD, DiskEfficiency} return } - if strings.Contains(opts["type"], ",") { - orderedList := []string{} - for _, cusType := range strings.Split(opts["type"], ",") { - if _, ok := CustomDiskTypes[cusType]; ok { - orderedList = append(orderedList, cusType) - } else { - return diskType, fmt.Errorf("Illegal required parameter type: " + cusType) - } + for _, cusType := range strings.Split(opts["type"], ",") { + c := Category(cusType) + if _, ok := AllCategories[c]; ok { + diskType = append(diskType, c) + } else { + return nil, fmt.Errorf("Illegal required parameter type: " + cusType) } - diskType = orderedList - return - } - t := opts["type"] - if AvailableDiskTypes.Has(t) { - diskType = []string{t} } if len(diskType) == 0 { return diskType, fmt.Errorf("Illegal required parameter type: " + opts["type"]) @@ -810,15 +797,15 @@ func validateDiskType(opts map[string]string) (diskType []string, err error) { return } -func validateDiskPerformanceLevel(opts map[string]string) (performanceLevel []string, err error) { +func validateDiskPerformanceLevel(opts map[string]string) (performanceLevel []PerformanceLevel, err error) { pl, ok := opts[ESSD_PERFORMANCE_LEVEL] if !ok || pl == "" { return } log.Infof("validateDiskPerformanceLevel: pl: %v", pl) - performanceLevel = strings.Split(pl, ",") - for _, cusPer := range performanceLevel { - if _, ok := CustomDiskPerfermance[cusPer]; !ok { + allPLs := AllCategories[DiskESSD].PerformanceLevel + for _, cusPer := range strings.Split(pl, ",") { + if _, ok := allPLs[PerformanceLevel(cusPer)]; !ok { return nil, fmt.Errorf("illegal performance level type: %s", cusPer) } } @@ -944,16 +931,6 @@ func isPathAvailiable(path string) error { return nil } -func deleteEmpty(s []string) []string { - var r []string - for _, str := range s { - if str != "" { - r = append(r, str) - } - } - return r -} - func getBlockDeviceCapacity(devicePath string) int64 { file, err := os.Open(devicePath) @@ -1034,21 +1011,6 @@ func patchForNode(node *v1.Node, maxVolumesNum int, diskTypes []string) []byte { return patch } -func intersect(slice1, slice2 []string) []string { - m := make(map[string]int) - nn := make([]string, 0) - for _, v := range slice1 { - m[v]++ - } - for _, v := range slice2 { - times, _ := m[v] - if times == 1 { - nn = append(nn, v) - } - } - return nn -} - func getEcsClientByID(volumeID, uid string) (ecsClient *ecs.Client, err error) { // feature gate not enable; if !GlobalConfigVar.DiskMultiTenantEnable { @@ -1126,7 +1088,7 @@ func createRoleClient(uid string) (cli *ecs.Client, err error) { return cli, nil } -func volumeCreate(diskType, diskID string, volSizeBytes int64, volumeContext map[string]string, zoneID string, contextSource *csi.VolumeContentSource) *csi.Volume { +func volumeCreate(attempt createAttempt, diskID string, volSizeBytes int64, volumeContext map[string]string, zoneID string, contextSource *csi.VolumeContentSource) *csi.Volume { accessibleTopology := []*csi.Topology{ { Segments: map[string]string{ @@ -1141,24 +1103,19 @@ func volumeCreate(diskType, diskID string, volSizeBytes int64, volumeContext map }, }) } - if diskType != "" { + if attempt.Category != "" { // Add PV Label - diskTypePL := diskType - if diskType == DiskESSD { - if pl, ok := volumeContext[ESSD_PERFORMANCE_LEVEL]; ok && pl != "" { - diskTypePL = fmt.Sprintf("%s.%s", DiskESSD, pl) - // TODO delete performanceLevel key - // delete(volumeContext, "performanceLevel") - } else { - diskTypePL = fmt.Sprintf("%s.%s", DiskESSD, "PL1") - } + if attempt.Category == DiskESSD && attempt.PerformanceLevel == "" { + attempt.PerformanceLevel = "PL1" } - volumeContext[labelAppendPrefix+labelVolumeType] = diskTypePL + // TODO delete performanceLevel key + // delete(volumeContext, "performanceLevel") + volumeContext[labelAppendPrefix+labelVolumeType] = attempt.String() // TODO delete type key // delete(volumeContext, "type") // Add PV NodeAffinity - labelKey := fmt.Sprintf(nodeStorageLabel, diskType) + labelKey := fmt.Sprintf(nodeStorageLabel, attempt.Category) expressions := []v1.NodeSelectorRequirement{{ Key: labelKey, Operator: v1.NodeSelectorOpIn, @@ -1228,7 +1185,8 @@ func staticVolumeCreate(req *csi.CreateVolumeRequest, snapshotID string) (*csi.V } } - return volumeCreate(disk.Category, diskID, volSizeBytes, volumeContext, disk.ZoneId, src), nil + attempt := createAttempt{Category(disk.Category), PerformanceLevel(disk.PerformanceLevel)} + return volumeCreate(attempt, diskID, volSizeBytes, volumeContext, disk.ZoneId, src), nil } // updateVolumeContext remove unnecessary volume context diff --git a/pkg/disk/utils_test.go b/pkg/disk/utils_test.go index 260de6aea..42df61c29 100644 --- a/pkg/disk/utils_test.go +++ b/pkg/disk/utils_test.go @@ -55,12 +55,12 @@ func TestIsFileExisting(t *testing.T) { func TestValidateDiskType(t *testing.T) { examples := []struct { opts map[string]string - result []string + result []Category }{ - {result: []string{"cloud_ssd", "cloud_efficiency"}}, + {result: []Category{"cloud_ssd", "cloud_efficiency"}}, {opts: map[string]string{"type": "a,b,c"}, result: nil}, - {opts: map[string]string{"type": "available"}, result: []string{"cloud_ssd", "cloud_efficiency"}}, - {opts: map[string]string{"type": "cloud_ssd,cloud_essd"}, result: []string{"cloud_ssd", "cloud_essd"}}, + {opts: map[string]string{"type": "available"}, result: []Category{"cloud_ssd", "cloud_efficiency"}}, + {opts: map[string]string{"type": "cloud_ssd,cloud_essd"}, result: []Category{"cloud_ssd", "cloud_essd"}}, } for _, example := range examples { actualResult, _ := validateDiskType(example.opts) @@ -631,6 +631,9 @@ func TestParseTagsInvalid(t *testing.T) { func TestGetDiskVolumeOptions(t *testing.T) { req := &csi.CreateVolumeRequest{ + CapacityRange: &csi.CapacityRange{ + RequiredBytes: 20*GBSIZE - 100, + }, Parameters: map[string]string{ "zoneId": "cn-beijing-i", "diskTags/a": "b", @@ -640,4 +643,5 @@ func TestGetDiskVolumeOptions(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "cn-beijing-i", opts.ZoneID) assert.Equal(t, map[string]string{"a": "b"}, opts.DiskTags) + assert.Equal(t, int64(20), opts.RequestGB) }