Skip to content

Commit

Permalink
Add volume expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
bertinatto committed Jun 13, 2019
1 parent ce0314a commit dda637f
Show file tree
Hide file tree
Showing 10 changed files with 522 additions and 20 deletions.
139 changes: 127 additions & 12 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type EC2 interface {
CreateSnapshotWithContext(ctx aws.Context, input *ec2.CreateSnapshotInput, opts ...request.Option) (*ec2.Snapshot, error)
DeleteSnapshotWithContext(ctx aws.Context, input *ec2.DeleteSnapshotInput, opts ...request.Option) (*ec2.DeleteSnapshotOutput, error)
DescribeSnapshotsWithContext(ctx aws.Context, input *ec2.DescribeSnapshotsInput, opts ...request.Option) (*ec2.DescribeSnapshotsOutput, error)
ModifyVolumeWithContext(ctx aws.Context, input *ec2.ModifyVolumeInput, opts ...request.Option) (*ec2.ModifyVolumeOutput, error)
DescribeVolumesModificationsWithContext(ctx aws.Context, input *ec2.DescribeVolumesModificationsInput, opts ...request.Option) (*ec2.DescribeVolumesModificationsOutput, error)
}

type Cloud interface {
Expand All @@ -168,6 +170,7 @@ type Cloud interface {
DeleteDisk(ctx context.Context, volumeID string) (success bool, err error)
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, state string) error
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
Expand Down Expand Up @@ -765,27 +768,139 @@ func (c *cloud) waitForVolume(ctx context.Context, volumeID string) error {
return err
}

// Helper function for describeVolume callers. Tries to retype given error to AWS error
// and returns true in case the AWS error is "InvalidVolume.NotFound", false otherwise
func isAWSErrorVolumeNotFound(err error) bool {
// isAWSError returns a boolean indicating whether the error is AWS-related
// and has the given code. More information on AWS error codes at:
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
func isAWSError(err error, code string) bool {
if awsError, ok := err.(awserr.Error); ok {
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsError.Code() == "InvalidVolume.NotFound" {
if awsError.Code() == code {
return true
}
}
return false
}

// Helper function for describeSnapshot callers. Tries to retype given error to AWS error
// and returns true in case the AWS error is "InvalidSnapshot.NotFound", false otherwise
// isAWSErrorIncorrectModification returns a boolean indicating whether the given error
// is an AWS IncorrectModificationState error. This error means that a modification action
// on an EBS volume cannot occur because the volume is currently being modified.
func isAWSErrorIncorrectModification(err error) bool {
return isAWSError(err, "IncorrectModificationState")
}

// isAWSErrorVolumeNotFound returns a boolean indicating whether the
// given error is an AWS InvalidVolume.NotFound error. This error is
// reported when the specified volume doesn't exist.
func isAWSErrorVolumeNotFound(err error) bool {
return isAWSError(err, "InvalidVolume.NotFound")
}

// isAWSErrorSnapshotNotFound returns a boolean indicating whether the
// given error is an AWS InvalidSnapshot.NotFound error. This error is
// reported when the specified snapshot doesn't exist.
func isAWSErrorSnapshotNotFound(err error) bool {
if awsError, ok := err.(awserr.Error); ok {
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
if awsError.Code() == "InvalidSnapshot.NotFound" {
return true
return isAWSError(err, "InvalidSnapshot.NotFound")
}

// ResizeDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit.
// It returns the volume size after this call or an error if the size couldn't be determined.
func (c *cloud) ResizeDisk(ctx context.Context, volumeID string, newSizeBytes int64) (int64, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
newSizeGiB := util.RoundUpGiB(newSizeBytes)
oldSizeGiB := aws.Int64Value(volume.Size)

if oldSizeGiB >= newSizeGiB {
klog.V(5).Infof("Volume %q's current size (%d GiB) is greater or equal to the new size (%d GiB)", volumeID, oldSizeGiB, newSizeGiB)
return oldSizeGiB, nil
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
Size: aws.Int64(newSizeGiB),
}

var mod *ec2.VolumeModification
response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
if !isAWSErrorIncorrectModification(err) {
return 0, fmt.Errorf("could not modify AWS volume %q: %v", volumeID, err)
}

m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
return 0, err
}
mod = m
}

return false
if mod == nil {
mod = response.VolumeModification
}

state := aws.StringValue(mod.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
return aws.Int64Value(mod.TargetSize), nil
}

return c.waitForVolumeSize(ctx, volumeID)
}

// waitForVolumeSize waits for a volume modification to finish and return its size.
func (c *cloud) waitForVolumeSize(ctx context.Context, volumeID string) (int64, error) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 20,
}

var modVolSizeGiB int64
waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
if err != nil {
return false, err
}

state := aws.StringValue(m.ModificationState)
if state == ec2.VolumeModificationStateCompleted || state == ec2.VolumeModificationStateOptimizing {
modVolSizeGiB = aws.Int64Value(m.TargetSize)
return true, nil
}

return false, nil
})

if waitErr != nil {
return 0, waitErr
}

return modVolSizeGiB, nil
}

// getLatestVolumeModification returns the last modification of the volume.
func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string) (*ec2.VolumeModification, error) {
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
mod, err := c.ec2.DescribeVolumesModificationsWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("error describing modifications in volume %q: %v", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, fmt.Errorf("could not find any modifications for volume %q", volumeID)
}

return volumeMods[len(volumeMods)-1], nil
}
127 changes: 127 additions & 0 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,133 @@ func TestDeleteSnapshot(t *testing.T) {
}
}

func TestResizeDisk(t *testing.T) {
testCases := []struct {
name string
volumeID string
existingVolume *ec2.Volume
existingVolumeError awserr.Error
modifiedVolume *ec2.ModifyVolumeOutput
modifiedVolumeError awserr.Error
descModVolume *ec2.DescribeVolumesModificationsOutput
reqSizeGiB int64
expErr error
}{
{
name: "success: normal",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
},
modifiedVolume: &ec2.ModifyVolumeOutput{
VolumeModification: &ec2.VolumeModification{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateOptimizing),
},
},
reqSizeGiB: 2,
expErr: nil,
},
{
name: "success: normal modifying state",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
},
modifiedVolume: &ec2.ModifyVolumeOutput{
VolumeModification: &ec2.VolumeModification{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateModifying),
},
},
descModVolume: &ec2.DescribeVolumesModificationsOutput{
VolumesModifications: []*ec2.VolumeModification{
{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
},
reqSizeGiB: 2,
expErr: nil,
},
{
name: "fail: volume doesn't exist",
volumeID: "vol-test",
existingVolumeError: awserr.New("InvalidVolume.NotFound", "", nil),
reqSizeGiB: 2,
expErr: fmt.Errorf("ResizeDisk generic error"),
},
{
name: "sucess: there is a resizing in progress",
volumeID: "vol-test",
existingVolume: &ec2.Volume{
VolumeId: aws.String("vol-test"),
Size: aws.Int64(1),
AvailabilityZone: aws.String(defaultZone),
},
modifiedVolumeError: awserr.New("IncorrectModificationState", "", nil),
descModVolume: &ec2.DescribeVolumesModificationsOutput{
VolumesModifications: []*ec2.VolumeModification{
{
VolumeId: aws.String("vol-test"),
TargetSize: aws.Int64(2),
ModificationState: aws.String(ec2.VolumeModificationStateCompleted),
},
},
},
reqSizeGiB: 2,
expErr: nil,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

ctx := context.Background()
if tc.existingVolume != nil || tc.existingVolumeError != nil {
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{tc.existingVolume},
}, tc.existingVolumeError).AnyTimes()
}
if tc.modifiedVolume != nil || tc.modifiedVolumeError != nil {
mockEC2.EXPECT().ModifyVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes()
}
if tc.descModVolume != nil {
mockEC2.EXPECT().DescribeVolumesModificationsWithContext(gomock.Eq(ctx), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes()
}

newSize, err := c.ResizeDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB))
if err != nil {
if tc.expErr == nil {
t.Fatalf("ResizeDisk() failed: expected no error, got: %v", err)
}
} else {
if tc.expErr != nil {
t.Fatal("ResizeDisk() failed: expected error, got nothing")
} else {
if tc.reqSizeGiB != newSize {
t.Fatalf("ResizeDisk() failed: expected capacity %d, got %d", tc.reqSizeGiB, newSize)
}
}
}

mockCtrl.Finish()
})
}
}

func TestGetSnapshotByName(t *testing.T) {
testCases := []struct {
name string
Expand Down
40 changes: 40 additions & 0 deletions pkg/cloud/mocks/mock_ec2.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dda637f

Please sign in to comment.