Skip to content

Commit

Permalink
Merge pull request #58 from bertinatto/wait
Browse files Browse the repository at this point in the history
Wait for attachment state after attach/detach
k8s-ci-robot authored Oct 15, 2018
2 parents 6c0b632 + f058f34 commit 290b6d0
Showing 5 changed files with 119 additions and 76 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -30,7 +30,8 @@ test-sanity:

.PHONY: test-e2e
test-e2e:
go test -v ./tests/e2e/...
go test -c ./tests/e2e/... -o bin/e2e.test && \
sudo -E bin/e2e.test -ginkgo.v

.PHONY: image
image:
82 changes: 58 additions & 24 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@@ -32,6 +33,7 @@ import (
"github.com/golang/glog"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
@@ -272,35 +274,18 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
return "", fmt.Errorf("could not attach volume %q to node %q: %v", volumeID, nodeID, err)
}
glog.V(5).Infof("AttachVolume volume=%q instance=%q request returned %v", volumeID, nodeID, resp)
}

// TODO: wait attaching
// TODO: this is the only situation where this method returns and the device is not released
//attachment, err := disk.waitForAttachmentStatus("attached")
//if err != nil {
// device.Taint()
//if err == wait.ErrWaitTimeout {
//c.applyUnSchedulableTaint(nodeName, "Volume stuck in attaching state - node needs reboot to fix impaired state.")
//}
//return "", err
//}
}

// Manually release the device so it can be picked again.
// This is the only situation where we taint the device
if err := c.waitForAttachmentState(ctx, volumeID, "attached"); err != nil {
device.Taint()
return "", err
}

// Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// TODO: Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call,
// which could theoretically be against a different device (or even instance).
//if attachment == nil {
//// Impossible?
//return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, nodeName)
//}
//if ec2Device != aws.StringValue(attachment.Device) {
//return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, nodeName, ec2Device, aws.StringValue(attachment.Device))
//}
//if awsInstance.awsID != aws.StringValue(attachment.InstanceId) {
//return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", diskName, nodeName, awsInstance.awsID, aws.StringValue(attachment.InstanceId))
//}
//return hostDevice, nil

return device.Path, nil
}
@@ -332,6 +317,10 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}

if err := c.waitForAttachmentState(ctx, volumeID, "detached"); err != nil {
return err
}

return nil
}

@@ -447,3 +436,48 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance,

return instances[0], nil
}

// waitForAttachmentStatus polls until the attachment status is the expected value.
func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state string) error {
// Most attach/detach operations on AWS finish within 1-4 seconds.
// By using 1 second starting interval with a backoff of 1.8,
// we get [1, 1.8, 3.24, 5.832000000000001, 10.4976].
// In total we wait for 2601 seconds.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}

verifyVolumeFunc := func() (bool, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}

volume, err := c.getVolume(ctx, request)
if err != nil {
return false, err
}

if len(volume.Attachments) == 0 {
if state == "detached" {
return true, nil
}
}

for _, a := range volume.Attachments {
if a.State == nil {
glog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
continue
}
if *a.State == state {
return true, nil
}
}
return false, nil
}

return wait.ExponentialBackoff(backoff, verifyVolumeFunc)
}
12 changes: 12 additions & 0 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
@@ -199,7 +199,13 @@ func TestAttachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

vol := &ec2.Volume{
VolumeId: aws.String(tc.volumeID),
Attachments: []*ec2.VolumeAttachment{{State: aws.String("attached")}},
}

ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

@@ -248,7 +254,13 @@ func TestDetachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

vol := &ec2.Volume{
VolumeId: aws.String(tc.volumeID),
Attachments: nil,
}

ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

93 changes: 42 additions & 51 deletions tests/e2e/e2e_test.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
csi "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/wait"
@@ -60,20 +59,21 @@ var _ = Describe("EBS CSI Driver", func() {
Parameters: nil,
}

logf("Creating volume with name %q", req.GetName())
resp, err := csiClient.ctrl.CreateVolume(context.Background(), req)
Expect(err).To(BeNil(), "Could not create volume")

volume := resp.GetVolume()
Expect(volume).NotTo(BeNil(), "Expected valid volume, got nil")
waitForVolumeState(volume.Id, "available")
waitForVolume(volume.Id, 1 /* number of expected volumes */)

// Delete volume
defer func() {
logf("Deleting volume %q", volume.Id)
_, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id})
Expect(err).To(BeNil(), "Could not delete volume")
waitForVolumes(volume.Id, 0 /* number of expected volumes */)
waitForVolume(volume.Id, 0 /* number of expected volumes */)

// Deleting volume twice
logf("Deleting volume %q twice", volume.Id)
_, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id})
Expect(err).To(BeNil(), "Error when trying to delete volume twice")
}()
@@ -86,7 +86,7 @@ var _ = Describe("EBS CSI Driver", func() {
})

func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) {
// Attach volume
logf("Attaching volume %q to node %q", volumeID, nodeID)
respAttach, err := csiClient.ctrl.ControllerPublishVolume(
context.Background(),
&csi.ControllerPublishVolumeRequest{
@@ -96,10 +96,10 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
},
)
Expect(err).To(BeNil(), "ControllerPublishVolume failed attaching volume %q to node %q", volumeID, nodeID)
waitForVolumeState(volumeID, "in-use")
assertAttachmentState(volumeID, "attached")

// Detach Volume
defer func() {
logf("Detaching volume %q from node %q", volumeID, nodeID)
_, err = csiClient.ctrl.ControllerUnpublishVolume(
context.Background(),
&csi.ControllerUnpublishVolumeRequest{
@@ -108,12 +108,12 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
},
)
Expect(err).To(BeNil(), "ControllerUnpublishVolume failed with error")
waitForVolumeState(volumeID, "available")
assertAttachmentState(volumeID, "detached")
}()

// Stage Disk
volDir := filepath.Join("/tmp/", volName)
stageDir := filepath.Join(volDir, "stage")
logf("Staging volume %q to path %q", volumeID, stageDir)
_, err = csiClient.node.NodeStageVolume(
context.Background(),
&csi.NodeStageVolumeRequest{
@@ -125,15 +125,15 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
Expect(err).To(BeNil(), "NodeStageVolume failed with error")

defer func() {
// Unstage Disk
logf("Unstaging volume %q from path %q", volumeID, stageDir)
_, err := csiClient.node.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{VolumeId: volumeID, StagingTargetPath: stageDir})
Expect(err).To(BeNil(), "NodeUnstageVolume failed with error")
err = os.RemoveAll(volDir)
Expect(err).To(BeNil(), "Failed to remove temp directory")
}()

// Mount Disk
publishDir := filepath.Join("/tmp/", volName, "mount")
logf("Publishing volume %q to path %q", volumeID, publishDir)
_, err = csiClient.node.NodePublishVolume(context.Background(), &csi.NodePublishVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stageDir,
@@ -142,8 +142,8 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
})
Expect(err).To(BeNil(), "NodePublishVolume failed with error")

// Unmount Disk
defer func() {
logf("Unpublishing volume %q from path %q", volumeID, publishDir)
_, err = csiClient.node.NodeUnpublishVolume(context.Background(), &csi.NodeUnpublishVolumeRequest{
VolumeId: volumeID,
TargetPath: publishDir,
@@ -152,6 +152,7 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
}()

if !readOnly {
logf("Writing and reading a file")
// Write a file
testFileContents := []byte("sample content")
testFile := filepath.Join(publishDir, "testfile")
@@ -164,50 +165,14 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
}
}

func waitForVolumeState(volumeID, state string) {
// Most attach/detach operations on AWS finish within 1-4 seconds.
// By using 1 second starting interval with a backoff of 1.8,
// we get [1, 1.8, 3.24, 5.832000000000001, 10.4976].
// In total we wait for 2601 seconds.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}
verifyVolumeFunc := func() (bool, error) {
params := &ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
}
volumes, err := describeVolumes(params)
if err != nil {
return false, err
}
if len(volumes) != 1 {
return false, fmt.Errorf("expected 1 volume, got %d", len(volumes))
}
if aws.StringValue(volumes[0].State) != state {
return false, nil
}
// We need to check the atachment state when the volume is "in-use",
// as it might still be "attaching" rather than "attached".
if state == "in-use" {
if aws.StringValue(volumes[0].Attachments[0].State) != "attached" {
return false, nil
}
}
return true, nil
}
waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc)
Expect(waitErr).To(BeNil(), "Timeout error waiting for volume state %q: %v", waitErr, state)
}

func waitForVolumes(volumeID string, nVolumes int) {
func waitForVolume(volumeID string, nVolumes int) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}
verifyVolumeFunc := func() (bool, error) {
logf("Waiting for %d volumes with ID %q", nVolumes, volumeID)
params := &ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
}
@@ -225,12 +190,38 @@ func waitForVolumes(volumeID string, nVolumes int) {
if len(volumes) != nVolumes {
return false, nil
}
if nVolumes == 1 {
if aws.StringValue(volumes[0].State) != "available" {
return false, nil
}
}
return true, nil
}
waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc)
Expect(waitErr).To(BeNil(), "Timeout error when looking for volume %q: %v", volumeID, waitErr)
}

func assertAttachmentState(volumeID, state string) {
logf("Checking if attachment state of volume %q is %q", volumeID, state)
volumes, err := describeVolumes(&ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
})
Expect(err).To(BeNil(), "Error describing volumes: %v", err)

nVolumes := len(volumes)
Expect(nVolumes).To(BeNumerically("==", 1), "Expected 1 volume, got %d", nVolumes)

// Detached volumes have 0 attachments
if state == "detached" {
nAttachments := len(volumes[0].Attachments)
Expect(nAttachments).To(BeNumerically("==", 0), "Expected 0 attachments, got %d", nAttachments)
return
}

aState := aws.StringValue(volumes[0].Attachments[0].State)
Expect(aState).To(Equal(state), "Expected state %s, got %s", state, aState)
}

func describeVolumes(params *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string
5 changes: 5 additions & 0 deletions tests/e2e/setup_test.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package e2e

import (
"flag"
"fmt"
"net"
"testing"
"time"
@@ -106,3 +107,7 @@ func newEC2Client() *ec2.EC2 {
}))
return ec2.New(sess)
}

func logf(format string, args ...interface{}) {
fmt.Fprintln(GinkgoWriter, fmt.Sprintf(format, args...))
}

0 comments on commit 290b6d0

Please sign in to comment.