Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not call ModifyVolume if the volume is already in the desired state #1741

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 82 additions & 55 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,14 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
volumeSize, err := c.validateVolumeSize(ctx, volumeID, newSizeGiB)
if err != nil || volumeSize != 0 {
return volumeSize, err
}
needsModification, volumeSize, err := c.validateModifyVolume(ctx, volumeID, newSizeGiB, options)
if err != nil || !needsModification {
return volumeSize, err
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
Expand All @@ -500,7 +496,7 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
if options.Throughput != 0 {
torredil marked this conversation as resolved.
Show resolved Hide resolved
req.Throughput = aws.Int64(int64(options.Throughput))
}

Expand All @@ -509,26 +505,17 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
// If the volume modification isn't immediately completed, wait for it to finish
state := aws.StringValue(response.VolumeModification.ModificationState)
if !volumeModificationDone(state) {
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return 0, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}

// Perform one final check on the volume
return c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb Q: Is the extra trip to EC2 API worth it? What is being checked here that wasn't checked earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the checkDesiredState function, there is a comment above it explaining the potential eventual consistency concerns the previous maintainers implemented it for.

Even if it really isn't necessary, I don't think this PR is the appropriate place to be tearing out such a check.

}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
Expand Down Expand Up @@ -1170,7 +1157,7 @@ func isAWSErrorIdempotentParameterMismatch(err error) bool {
// Checks for desired size on volume by also verifying volume size by describing volume.
// This is to get around potential eventual consistency problems with describing volume modifications
// objects and ensuring that we read two different objects to verify volume state.
func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func (c *cloud) checkDesiredState(ctx context.Context, volumeID string, desiredSizeGiB int64, options *ModifyDiskOptions) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
Expand All @@ -1182,15 +1169,25 @@ func (c *cloud) checkDesiredSize(ctx context.Context, volumeID string, newSizeGi
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB := aws.Int64Value(volume.Size)
if oldSizeGiB >= newSizeGiB {
return oldSizeGiB, nil
realSizeGiB := aws.Int64Value(volume.Size)

// Check if there is a mismatch between the requested modification and the current volume
// If there is, the volume is still modifying and we should not return a success
if realSizeGiB < desiredSizeGiB {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This chunk was a bit hard to read on first glance, could you add a comment or refactor this into a helper function?

Something Like "modifyVolumeIdempotencyCheck"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that already what this function is? I don't think making a function that will never be reused is a good idea, I'll go with your second suggestion and add a comment unless there are other objections.

return realSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, desiredSizeGiB)
} else if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to iops %d", volumeID, options.IOPS)
} else if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to type %q", volumeID, options.VolumeType)
} else if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
return realSizeGiB, fmt.Errorf("volume %q is still being modified to throughput %d", volumeID, options.Throughput)
}
return oldSizeGiB, fmt.Errorf("volume %q is still being expanded to %d size", volumeID, newSizeGiB)

return realSizeGiB, nil
}

// waitForVolumeSize waits for a volume modification to finish.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {
// waitForVolumeModification waits for a volume modification to finish.
func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string) error {
backoff := wait.Backoff{
Duration: volumeModificationDuration,
Factor: volumeModificationWaitFactor,
Expand All @@ -1199,7 +1196,10 @@ func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) error {

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
} else if err != nil {
return false, err
}

Expand Down Expand Up @@ -1271,49 +1271,76 @@ func (c *cloud) AvailabilityZones(ctx context.Context) (map[string]struct{}, err
return zones, nil
}

func (c *cloud) validateVolumeSize(ctx context.Context, volumeID string, newSizeGiB int64) (int64, error) {
func needsVolumeModification(volume *ec2.Volume, newSizeGiB int64, options *ModifyDiskOptions) bool {
oldSizeGiB := aws.Int64Value(volume.Size)
needsModification := false

if oldSizeGiB < newSizeGiB {
needsModification = true
}
if options.IOPS != 0 && (volume.Iops == nil || *volume.Iops != int64(options.IOPS)) {
needsModification = true
}
if options.VolumeType != "" && !strings.EqualFold(*volume.VolumeType, options.VolumeType) {
needsModification = true
}
if options.Throughput != 0 && (volume.Throughput == nil || *volume.Throughput != int64(options.Throughput)) {
needsModification = true
}

return needsModification
}

func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSizeGiB int64, options *ModifyDiskOptions) (bool, int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
return true, 0, err
}

oldSizeGiB := aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)
latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}

if latestMod != nil && modFetchError == nil {
// latestMod can be nil if the volume has never been modified
if latestMod != nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
// If volume is already modifying, detour to waiting for it to modify
klog.V(5).InfoS("[Debug] Watching ongoing modification", "volumeID", volumeID)
err = c.waitForVolumeModification(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
return true, oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
} else if state == ec2.VolumeModificationStateOptimizing {
return true, 0, fmt.Errorf("volume %q in OPTIMIZING state, cannot currently modify", volumeID)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
ConnorJC3 marked this conversation as resolved.
Show resolved Hide resolved
if !needsVolumeModification(volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
// state before the volume is actually finished modifying
err = c.waitForVolumeModification(ctx, volumeID)
ConnorJC3 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return true, oldSizeGiB, err
}
return oldSizeGiB, nil
returnGiB, returnErr := c.checkDesiredState(ctx, volumeID, newSizeGiB, options)
return false, returnGiB, returnErr
}
return 0, nil

return true, 0, nil
}

func volumeModificationDone(state string) bool {
Expand Down
91 changes: 74 additions & 17 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ func TestResizeOrModifyDisk(t *testing.T) {
reqSizeGiB int64
modifyDiskOptions *ModifyDiskOptions
expErr error
shouldCallDescribe bool
}{
{
name: "success: normal resize",
Expand All @@ -1357,9 +1358,10 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: normal modifying state",
Expand All @@ -1385,9 +1387,10 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: with previous expansion",
Expand All @@ -1406,13 +1409,18 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
},
},
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
reqSizeGiB: 2,
modifyDiskOptions: &ModifyDiskOptions{},
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify IOPS, throughput and volume type",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
VolumeType: aws.String("gp2"),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
Expand All @@ -1427,7 +1435,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
expErr: nil,
expErr: nil,
shouldCallDescribe: true,
},
{
name: "success: modify size, IOPS, throughput and volume type",
Expand All @@ -1436,6 +1445,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
Iops: aws.Int64(2000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
Expand All @@ -1453,7 +1464,8 @@ func TestResizeOrModifyDisk(t *testing.T) {
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
expErr: nil,
expErr: nil,
shouldCallDescribe: true,
},
{
name: "fail: volume doesn't exist",
Expand Down Expand Up @@ -1489,9 +1501,44 @@ func TestResizeOrModifyDisk(t *testing.T) {
VolumeType: "GP2",
IOPS: 3000,
},
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp2"),
},
modifiedVolumeError: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
expErr: awserr.New("InvalidParameterCombination", "The parameter iops is not supported for gp2 volumes", nil),
},
{
name: "success: does not call ModifyVolume when no modification required",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
VolumeType: aws.String("gp3"),
Iops: aws.Int64(3000),
},
modifyDiskOptions: &ModifyDiskOptions{
VolumeType: "GP3",
IOPS: 3000,
},
shouldCallDescribe: true,
},
{
name: "success: does not call ModifyVolume when no modification required (with size)",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
AvailabilityZone: aws.String(defaultZone),
Size: aws.Int64(13),
Iops: aws.Int64(3000),
},
reqSizeGiB: 13,
modifyDiskOptions: &ModifyDiskOptions{
IOPS: 3000,
},
shouldCallDescribe: true,
},
}

for _, tc := range testCases {
Expand All @@ -1511,16 +1558,26 @@ func TestResizeOrModifyDisk(t *testing.T) {
},
}, tc.existingVolumeError)

if tc.expErr == nil && aws.Int64Value(tc.existingVolume.Size) != tc.reqSizeGiB {
resizedVolume := &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(tc.reqSizeGiB),
AvailabilityZone: aws.String(defaultZone),
if tc.shouldCallDescribe {
newVolume := tc.existingVolume
if tc.reqSizeGiB != 0 {
newVolume.Size = aws.Int64(tc.reqSizeGiB)
}
if tc.modifyDiskOptions != nil {
if tc.modifyDiskOptions.IOPS != 0 {
newVolume.Iops = aws.Int64(int64(tc.modifyDiskOptions.IOPS))
}
if tc.modifyDiskOptions.Throughput != 0 {
newVolume.Throughput = aws.Int64(int64(tc.modifyDiskOptions.Throughput))
}
if tc.modifyDiskOptions.VolumeType != "" {
newVolume.VolumeType = aws.String(tc.modifyDiskOptions.VolumeType)
}
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Any(), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
resizedVolume,
newVolume,
},
}, tc.existingVolumeError)
}
Expand Down