Skip to content

Commit

Permalink
Wait for volume to become available
Browse files Browse the repository at this point in the history
  • Loading branch information
bertinatto committed Nov 28, 2018
1 parent 9739153 commit a739d75
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 114 deletions.
1 change: 1 addition & 0 deletions deploy/kubernetes/v1.12+/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ spec:
- --csi-address=$(ADDRESS)
- --v=5
- --feature-gates=Topology=true
- --connection-timeout=20s
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand Down
24 changes: 9 additions & 15 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,8 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("disk size was not returned by CreateVolume")
}

if len(diskOptions.KmsKeyID) > 0 {
err := c.waitForCreate(ctx, volumeID)
if err != nil {
if isAWSErrorVolumeNotFound(err) {
return nil, fmt.Errorf("failed to create encrypted volume: the volume disappeared after creation, most likely due to inaccessible KMS encryption key")
}
}
if err := c.waitForVolume(ctx, volumeID); err != nil {
return nil, fmt.Errorf("failed to get an available volume in EC2: %v", err)
}

return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone}, nil
Expand Down Expand Up @@ -500,13 +495,14 @@ func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state stri
return wait.ExponentialBackoff(backoff, verifyVolumeFunc)
}

// waitForCreate waits for volume to be created for encrypted volume only
// it polls for created volume to check it has not been silently removed by AWS.
// waitForVolume waits for volume to be in the "available" state.
// On a random AWS account (shared among several developers) it took 4s on average.
func (c *cloud) waitForCreate(ctx context.Context, volumeID string) error {
// Also, we assume that the default timeout in the controller is 20s, so we
// make our retry timeout lower in order to avoid exceeding the controller's one.
func (c *cloud) waitForVolume(ctx context.Context, volumeID string) error {
var (
checkInterval = 1 * time.Second
checkTimeout = 30 * time.Second
checkInterval = 3 * time.Second
checkTimeout = 15 * time.Second
)

request := &ec2.DescribeVolumesInput{
Expand All @@ -523,12 +519,11 @@ func (c *cloud) waitForCreate(ctx context.Context, volumeID string) error {
if vol.State != nil {
switch *vol.State {
case "available":
// The volume is Available, it won't be deleted now.
return true, nil
case "creating":
return false, nil
default:
return true, fmt.Errorf("unexpected State of newly created AWS EBS volume %s: %q", volumeID, *vol.State)
return true, fmt.Errorf("unexpected state for volume %s: %q", volumeID, *vol.State)
}
}
return false, nil
Expand All @@ -546,6 +541,5 @@ func isAWSErrorVolumeNotFound(err error) bool {
return true
}
}

return false
}
67 changes: 40 additions & 27 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import (

func TestCreateDisk(t *testing.T) {
testCases := []struct {
name string
volumeName string
diskOptions *DiskOptions
expDisk *Disk
expErr error
name string
volumeName string
volState string
diskOptions *DiskOptions
expDisk *Disk
expErr error
expDescVolErr error
}{
{
name: "success: normal",
Expand Down Expand Up @@ -67,16 +69,6 @@ func TestCreateDisk(t *testing.T) {
},
expErr: nil,
},
{
name: "fail: CreateVolume returned an error",
volumeName: "vol-test-name-error",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expErr: fmt.Errorf("CreateVolume generic error"),
},
{
name: "success: normal with encrypted volume",
volumeName: "vol-test-name",
Expand All @@ -93,6 +85,28 @@ func TestCreateDisk(t *testing.T) {
},
expErr: nil,
},
{
name: "fail: CreateVolume returned an error",
volumeName: "vol-test-name-error",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expErr: fmt.Errorf("CreateVolume generic error"),
},
{
name: "fail: CreateVolume returned a volume with wrong state",
volumeName: "vol-test-name-error",
volState: "creating",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(1),
Tags: map[string]string{VolumeNameTagKey: "vol-test"},
AvailabilityZone: "",
},
expErr: nil,
expDescVolErr: fmt.Errorf("DescribeVolumes generic error"),
},
}

for _, tc := range testCases {
Expand All @@ -101,25 +115,24 @@ func TestCreateDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

vol := &ec2.Volume{}
if tc.expErr == nil {
vol = &ec2.Volume{
VolumeId: aws.String(tc.diskOptions.Tags[VolumeNameTagKey]),
Size: aws.Int64(util.BytesToGiB(tc.diskOptions.CapacityBytes)),
State: aws.String("available"),
}
volState := tc.volState
if volState == "" {
volState = "available"
}

vol := &ec2.Volume{
VolumeId: aws.String(tc.diskOptions.Tags[VolumeNameTagKey]),
Size: aws.Int64(util.BytesToGiB(tc.diskOptions.CapacityBytes)),
State: aws.String(volState),
}

ctx := context.Background()
mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr)

if tc.diskOptions.Encrypted {
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil)
}
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes()

disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
if tc.expErr == nil && tc.expDescVolErr == nil {
t.Fatalf("CreateDisk() failed: expected no error, got: %v", err)
}
} else {
Expand Down
128 changes: 56 additions & 72 deletions pkg/cloud/mocks/mock_ec2.go
Original file line number Diff line number Diff line change
@@ -1,144 +1,128 @@
// Code generated by MockGen. DO NOT EDIT.
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud (interfaces: EC2)

// Package mocks is a generated GoMock package.
package mocks

import (
aws "github.com/aws/aws-sdk-go/aws"
request "github.com/aws/aws-sdk-go/aws/request"
ec2 "github.com/aws/aws-sdk-go/service/ec2"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)

// MockEC2 is a mock of EC2 interface
// Mock of EC2 interface
type MockEC2 struct {
ctrl *gomock.Controller
recorder *MockEC2MockRecorder
recorder *_MockEC2Recorder
}

// MockEC2MockRecorder is the mock recorder for MockEC2
type MockEC2MockRecorder struct {
// Recorder for MockEC2 (not exported)
type _MockEC2Recorder struct {
mock *MockEC2
}

// NewMockEC2 creates a new mock instance
func NewMockEC2(ctrl *gomock.Controller) *MockEC2 {
mock := &MockEC2{ctrl: ctrl}
mock.recorder = &MockEC2MockRecorder{mock}
mock.recorder = &_MockEC2Recorder{mock}
return mock
}

// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockEC2) EXPECT() *MockEC2MockRecorder {
return m.recorder
func (_m *MockEC2) EXPECT() *_MockEC2Recorder {
return _m.recorder
}

// AttachVolumeWithContext mocks base method
func (m *MockEC2) AttachVolumeWithContext(arg0 aws.Context, arg1 *ec2.AttachVolumeInput, arg2 ...request.Option) (*ec2.VolumeAttachment, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) AttachVolumeWithContext(_param0 aws.Context, _param1 *ec2.AttachVolumeInput, _param2 ...request.Option) (*ec2.VolumeAttachment, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "AttachVolumeWithContext", varargs...)
ret := _m.ctrl.Call(_m, "AttachVolumeWithContext", _s...)
ret0, _ := ret[0].(*ec2.VolumeAttachment)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// AttachVolumeWithContext indicates an expected call of AttachVolumeWithContext
func (mr *MockEC2MockRecorder) AttachVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AttachVolumeWithContext", reflect.TypeOf((*MockEC2)(nil).AttachVolumeWithContext), varargs...)
func (_mr *_MockEC2Recorder) AttachVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "AttachVolumeWithContext", _s...)
}

// CreateVolumeWithContext mocks base method
func (m *MockEC2) CreateVolumeWithContext(arg0 aws.Context, arg1 *ec2.CreateVolumeInput, arg2 ...request.Option) (*ec2.Volume, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) CreateVolumeWithContext(_param0 aws.Context, _param1 *ec2.CreateVolumeInput, _param2 ...request.Option) (*ec2.Volume, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "CreateVolumeWithContext", varargs...)
ret := _m.ctrl.Call(_m, "CreateVolumeWithContext", _s...)
ret0, _ := ret[0].(*ec2.Volume)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// CreateVolumeWithContext indicates an expected call of CreateVolumeWithContext
func (mr *MockEC2MockRecorder) CreateVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateVolumeWithContext", reflect.TypeOf((*MockEC2)(nil).CreateVolumeWithContext), varargs...)
func (_mr *_MockEC2Recorder) CreateVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "CreateVolumeWithContext", _s...)
}

// DeleteVolumeWithContext mocks base method
func (m *MockEC2) DeleteVolumeWithContext(arg0 aws.Context, arg1 *ec2.DeleteVolumeInput, arg2 ...request.Option) (*ec2.DeleteVolumeOutput, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) DeleteVolumeWithContext(_param0 aws.Context, _param1 *ec2.DeleteVolumeInput, _param2 ...request.Option) (*ec2.DeleteVolumeOutput, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "DeleteVolumeWithContext", varargs...)
ret := _m.ctrl.Call(_m, "DeleteVolumeWithContext", _s...)
ret0, _ := ret[0].(*ec2.DeleteVolumeOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// DeleteVolumeWithContext indicates an expected call of DeleteVolumeWithContext
func (mr *MockEC2MockRecorder) DeleteVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteVolumeWithContext", reflect.TypeOf((*MockEC2)(nil).DeleteVolumeWithContext), varargs...)
func (_mr *_MockEC2Recorder) DeleteVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteVolumeWithContext", _s...)
}

// DescribeInstancesWithContext mocks base method
func (m *MockEC2) DescribeInstancesWithContext(arg0 aws.Context, arg1 *ec2.DescribeInstancesInput, arg2 ...request.Option) (*ec2.DescribeInstancesOutput, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) DescribeInstancesWithContext(_param0 aws.Context, _param1 *ec2.DescribeInstancesInput, _param2 ...request.Option) (*ec2.DescribeInstancesOutput, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "DescribeInstancesWithContext", varargs...)
ret := _m.ctrl.Call(_m, "DescribeInstancesWithContext", _s...)
ret0, _ := ret[0].(*ec2.DescribeInstancesOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// DescribeInstancesWithContext indicates an expected call of DescribeInstancesWithContext
func (mr *MockEC2MockRecorder) DescribeInstancesWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeInstancesWithContext", reflect.TypeOf((*MockEC2)(nil).DescribeInstancesWithContext), varargs...)
func (_mr *_MockEC2Recorder) DescribeInstancesWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "DescribeInstancesWithContext", _s...)
}

// DescribeVolumesWithContext mocks base method
func (m *MockEC2) DescribeVolumesWithContext(arg0 aws.Context, arg1 *ec2.DescribeVolumesInput, arg2 ...request.Option) (*ec2.DescribeVolumesOutput, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) DescribeVolumesWithContext(_param0 aws.Context, _param1 *ec2.DescribeVolumesInput, _param2 ...request.Option) (*ec2.DescribeVolumesOutput, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "DescribeVolumesWithContext", varargs...)
ret := _m.ctrl.Call(_m, "DescribeVolumesWithContext", _s...)
ret0, _ := ret[0].(*ec2.DescribeVolumesOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// DescribeVolumesWithContext indicates an expected call of DescribeVolumesWithContext
func (mr *MockEC2MockRecorder) DescribeVolumesWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeVolumesWithContext", reflect.TypeOf((*MockEC2)(nil).DescribeVolumesWithContext), varargs...)
func (_mr *_MockEC2Recorder) DescribeVolumesWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "DescribeVolumesWithContext", _s...)
}

// DetachVolumeWithContext mocks base method
func (m *MockEC2) DetachVolumeWithContext(arg0 aws.Context, arg1 *ec2.DetachVolumeInput, arg2 ...request.Option) (*ec2.VolumeAttachment, error) {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
func (_m *MockEC2) DetachVolumeWithContext(_param0 aws.Context, _param1 *ec2.DetachVolumeInput, _param2 ...request.Option) (*ec2.VolumeAttachment, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := m.ctrl.Call(m, "DetachVolumeWithContext", varargs...)
ret := _m.ctrl.Call(_m, "DetachVolumeWithContext", _s...)
ret0, _ := ret[0].(*ec2.VolumeAttachment)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// DetachVolumeWithContext indicates an expected call of DetachVolumeWithContext
func (mr *MockEC2MockRecorder) DetachVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DetachVolumeWithContext", reflect.TypeOf((*MockEC2)(nil).DetachVolumeWithContext), varargs...)
func (_mr *_MockEC2Recorder) DetachVolumeWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "DetachVolumeWithContext", _s...)
}

0 comments on commit a739d75

Please sign in to comment.