Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for attachment state after attach/detach #58

Merged
merged 2 commits into from
Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ test-sanity:

.PHONY: test-e2e
test-e2e:
go test -v ./tests/e2e/...
go test -c ./tests/e2e/... -o bin/e2e.test && \
sudo -E bin/e2e.test -ginkgo.v

.PHONY: image
image:
Expand Down
82 changes: 58 additions & 24 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/golang/glog"
dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -272,35 +274,18 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
return "", fmt.Errorf("could not attach volume %q to node %q: %v", volumeID, nodeID, err)
}
glog.V(5).Infof("AttachVolume volume=%q instance=%q request returned %v", volumeID, nodeID, resp)
}

// TODO: wait attaching
// TODO: this is the only situation where this method returns and the device is not released
//attachment, err := disk.waitForAttachmentStatus("attached")
//if err != nil {
// device.Taint()
//if err == wait.ErrWaitTimeout {
//c.applyUnSchedulableTaint(nodeName, "Volume stuck in attaching state - node needs reboot to fix impaired state.")
//}
//return "", err
//}
}

// Manually release the device so it can be picked again.
// This is the only situation where we taint the device
if err := c.waitForAttachmentState(ctx, volumeID, "attached"); err != nil {
device.Taint()
return "", err
}

// Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// TODO: Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call,
// which could theoretically be against a different device (or even instance).
//if attachment == nil {
//// Impossible?
//return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, nodeName)
//}
//if ec2Device != aws.StringValue(attachment.Device) {
//return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, nodeName, ec2Device, aws.StringValue(attachment.Device))
//}
//if awsInstance.awsID != aws.StringValue(attachment.InstanceId) {
//return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", diskName, nodeName, awsInstance.awsID, aws.StringValue(attachment.InstanceId))
//}
//return hostDevice, nil

return device.Path, nil
}
Expand Down Expand Up @@ -332,6 +317,10 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}

if err := c.waitForAttachmentState(ctx, volumeID, "detached"); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -447,3 +436,48 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance,

return instances[0], nil
}

// waitForAttachmentStatus polls until the attachment status is the expected value.
func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state string) error {
// Most attach/detach operations on AWS finish within 1-4 seconds.
// By using 1 second starting interval with a backoff of 1.8,
// we get [1, 1.8, 3.24, 5.832000000000001, 10.4976].
// In total we wait for 2601 seconds.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}

verifyVolumeFunc := func() (bool, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}

volume, err := c.getVolume(ctx, request)
if err != nil {
return false, err
}

if len(volume.Attachments) == 0 {
if state == "detached" {
return true, nil
}
}

for _, a := range volume.Attachments {
if a.State == nil {
glog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
continue
}
if *a.State == state {
return true, nil
}
}
return false, nil
}

return wait.ExponentialBackoff(backoff, verifyVolumeFunc)
}
12 changes: 12 additions & 0 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,13 @@ func TestAttachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

vol := &ec2.Volume{
VolumeId: aws.String(tc.volumeID),
Attachments: []*ec2.VolumeAttachment{{State: aws.String("attached")}},
}

ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

Expand Down Expand Up @@ -248,7 +254,13 @@ func TestDetachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

vol := &ec2.Volume{
VolumeId: aws.String(tc.volumeID),
Attachments: nil,
}

ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

Expand Down
93 changes: 42 additions & 51 deletions tests/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
csi "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -60,20 +59,21 @@ var _ = Describe("EBS CSI Driver", func() {
Parameters: nil,
}

logf("Creating volume with name %q", req.GetName())
resp, err := csiClient.ctrl.CreateVolume(context.Background(), req)
Expect(err).To(BeNil(), "Could not create volume")

volume := resp.GetVolume()
Expect(volume).NotTo(BeNil(), "Expected valid volume, got nil")
waitForVolumeState(volume.Id, "available")
waitForVolume(volume.Id, 1 /* number of expected volumes */)

// Delete volume
defer func() {
logf("Deleting volume %q", volume.Id)
_, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id})
Expect(err).To(BeNil(), "Could not delete volume")
waitForVolumes(volume.Id, 0 /* number of expected volumes */)
waitForVolume(volume.Id, 0 /* number of expected volumes */)

// Deleting volume twice
logf("Deleting volume %q twice", volume.Id)
_, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id})
Expect(err).To(BeNil(), "Error when trying to delete volume twice")
}()
Expand All @@ -86,7 +86,7 @@ var _ = Describe("EBS CSI Driver", func() {
})

func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) {
// Attach volume
logf("Attaching volume %q to node %q", volumeID, nodeID)
respAttach, err := csiClient.ctrl.ControllerPublishVolume(
context.Background(),
&csi.ControllerPublishVolumeRequest{
Expand All @@ -96,10 +96,10 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
},
)
Expect(err).To(BeNil(), "ControllerPublishVolume failed attaching volume %q to node %q", volumeID, nodeID)
waitForVolumeState(volumeID, "in-use")
assertAttachmentState(volumeID, "attached")

// Detach Volume
defer func() {
logf("Detaching volume %q from node %q", volumeID, nodeID)
_, err = csiClient.ctrl.ControllerUnpublishVolume(
context.Background(),
&csi.ControllerUnpublishVolumeRequest{
Expand All @@ -108,12 +108,12 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
},
)
Expect(err).To(BeNil(), "ControllerUnpublishVolume failed with error")
waitForVolumeState(volumeID, "available")
assertAttachmentState(volumeID, "detached")
}()

// Stage Disk
volDir := filepath.Join("/tmp/", volName)
stageDir := filepath.Join(volDir, "stage")
logf("Staging volume %q to path %q", volumeID, stageDir)
_, err = csiClient.node.NodeStageVolume(
context.Background(),
&csi.NodeStageVolumeRequest{
Expand All @@ -125,15 +125,15 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
Expect(err).To(BeNil(), "NodeStageVolume failed with error")

defer func() {
// Unstage Disk
logf("Unstaging volume %q from path %q", volumeID, stageDir)
_, err := csiClient.node.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{VolumeId: volumeID, StagingTargetPath: stageDir})
Expect(err).To(BeNil(), "NodeUnstageVolume failed with error")
err = os.RemoveAll(volDir)
Expect(err).To(BeNil(), "Failed to remove temp directory")
}()

// Mount Disk
publishDir := filepath.Join("/tmp/", volName, "mount")
logf("Publishing volume %q to path %q", volumeID, publishDir)
_, err = csiClient.node.NodePublishVolume(context.Background(), &csi.NodePublishVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stageDir,
Expand All @@ -142,8 +142,8 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
})
Expect(err).To(BeNil(), "NodePublishVolume failed with error")

// Unmount Disk
defer func() {
logf("Unpublishing volume %q from path %q", volumeID, publishDir)
_, err = csiClient.node.NodeUnpublishVolume(context.Background(), &csi.NodeUnpublishVolumeRequest{
VolumeId: volumeID,
TargetPath: publishDir,
Expand All @@ -152,6 +152,7 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
}()

if !readOnly {
logf("Writing and reading a file")
// Write a file
testFileContents := []byte("sample content")
testFile := filepath.Join(publishDir, "testfile")
Expand All @@ -164,50 +165,14 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool)
}
}

func waitForVolumeState(volumeID, state string) {
// Most attach/detach operations on AWS finish within 1-4 seconds.
// By using 1 second starting interval with a backoff of 1.8,
// we get [1, 1.8, 3.24, 5.832000000000001, 10.4976].
// In total we wait for 2601 seconds.
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}
verifyVolumeFunc := func() (bool, error) {
params := &ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
}
volumes, err := describeVolumes(params)
if err != nil {
return false, err
}
if len(volumes) != 1 {
return false, fmt.Errorf("expected 1 volume, got %d", len(volumes))
}
if aws.StringValue(volumes[0].State) != state {
return false, nil
}
// We need to check the atachment state when the volume is "in-use",
// as it might still be "attaching" rather than "attached".
if state == "in-use" {
if aws.StringValue(volumes[0].Attachments[0].State) != "attached" {
return false, nil
}
}
return true, nil
}
waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc)
Expect(waitErr).To(BeNil(), "Timeout error waiting for volume state %q: %v", waitErr, state)
}

func waitForVolumes(volumeID string, nVolumes int) {
func waitForVolume(volumeID string, nVolumes int) {
backoff := wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.8,
Steps: 13,
}
verifyVolumeFunc := func() (bool, error) {
logf("Waiting for %d volumes with ID %q", nVolumes, volumeID)
params := &ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
}
Expand All @@ -225,12 +190,38 @@ func waitForVolumes(volumeID string, nVolumes int) {
if len(volumes) != nVolumes {
return false, nil
}
if nVolumes == 1 {
if aws.StringValue(volumes[0].State) != "available" {
return false, nil
}
}
return true, nil
}
waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc)
Expect(waitErr).To(BeNil(), "Timeout error when looking for volume %q: %v", volumeID, waitErr)
}

func assertAttachmentState(volumeID, state string) {
logf("Checking if attachment state of volume %q is %q", volumeID, state)
volumes, err := describeVolumes(&ec2.DescribeVolumesInput{
VolumeIds: []*string{aws.String(volumeID)},
})
Expect(err).To(BeNil(), "Error describing volumes: %v", err)

nVolumes := len(volumes)
Expect(nVolumes).To(BeNumerically("==", 1), "Expected 1 volume, got %d", nVolumes)

// Detached volumes have 0 attachments
if state == "detached" {
nAttachments := len(volumes[0].Attachments)
Expect(nAttachments).To(BeNumerically("==", 0), "Expected 0 attachments, got %d", nAttachments)
return
}

aState := aws.StringValue(volumes[0].Attachments[0].State)
Expect(aState).To(Equal(state), "Expected state %s, got %s", state, aState)
}

func describeVolumes(params *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string
Expand Down
5 changes: 5 additions & 0 deletions tests/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package e2e

import (
"flag"
"fmt"
"net"
"testing"
"time"
Expand Down Expand Up @@ -106,3 +107,7 @@ func newEC2Client() *ec2.EC2 {
}))
return ec2.New(sess)
}

func logf(format string, args ...interface{}) {
fmt.Fprintln(GinkgoWriter, fmt.Sprintf(format, args...))
}