From f11db6cf057c98f887847edb69ae0f3f2af79035 Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Thu, 11 Oct 2018 16:47:28 +0200 Subject: [PATCH 1/2] Wait for correct attachment state --- pkg/cloud/cloud.go | 80 ++++++++++++++++++++++++++++------------- pkg/cloud/cloud_test.go | 12 +++++++ 2 files changed, 68 insertions(+), 24 deletions(-) diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index cafb24dc46..25e3713728 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -32,6 +33,7 @@ import ( "github.com/golang/glog" dm "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud/devicemanager" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" + "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -272,35 +274,18 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string return "", fmt.Errorf("could not attach volume %q to node %q: %v", volumeID, nodeID, err) } glog.V(5).Infof("AttachVolume volume=%q instance=%q request returned %v", volumeID, nodeID, resp) - } - // TODO: wait attaching - // TODO: this is the only situation where this method returns and the device is not released - //attachment, err := disk.waitForAttachmentStatus("attached") - //if err != nil { - // device.Taint() - //if err == wait.ErrWaitTimeout { - //c.applyUnSchedulableTaint(nodeName, "Volume stuck in attaching state - node needs reboot to fix impaired state.") - //} - //return "", err - //} + } - // Manually release the device so it can be picked again. + // This is the only situation where we taint the device + if err := c.waitForAttachmentState(ctx, volumeID, "attached"); err != nil { + device.Taint() + return "", err + } - // Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint + // TODO: Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint // It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call, // which could theoretically be against a different device (or even instance). - //if attachment == nil { - //// Impossible? - //return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, nodeName) - //} - //if ec2Device != aws.StringValue(attachment.Device) { - //return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, nodeName, ec2Device, aws.StringValue(attachment.Device)) - //} - //if awsInstance.awsID != aws.StringValue(attachment.InstanceId) { - //return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", diskName, nodeName, awsInstance.awsID, aws.StringValue(attachment.InstanceId)) - //} - //return hostDevice, nil return device.Path, nil } @@ -332,6 +317,10 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error { return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err) } + if err := c.waitForAttachmentState(ctx, volumeID, "detached"); err != nil { + return err + } + return nil } @@ -447,3 +436,46 @@ func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, return instances[0], nil } + +// waitForAttachmentStatus polls until the attachment status is the expected value. +func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state string) error { + // Most attach/detach operations on AWS finish within 1-4 seconds. + // By using 1 second starting interval with a backoff of 1.8, + // we get [1, 1.8, 3.24, 5.832000000000001, 10.4976]. + // In total we wait for 2601 seconds. + backoff := wait.Backoff{ + Duration: 1 * time.Second, + Factor: 1.8, + Steps: 13, + } + + verifyVolumeFunc := func() (bool, error) { + request := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{ + aws.String(volumeID), + }, + } + + volume, err := c.getVolume(ctx, request) + if err != nil { + return false, err + } + + if len(volume.Attachments) > 1 { + glog.Warningf("Found multiple attachments for volume %q: %v", volumeID, volume) + } + + for _, a := range volume.Attachments { + if a.State == nil { + glog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a) + continue + } + if *a.State == state { + return true, nil + } + } + return false, nil + } + + return wait.ExponentialBackoff(backoff, verifyVolumeFunc) +} diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 29020bdd76..741691fe09 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -199,7 +199,13 @@ func TestAttachDisk(t *testing.T) { mockEC2 := mocks.NewMockEC2(mockCtrl) c := newCloud(mockEC2) + vol := &ec2.Volume{ + VolumeId: aws.String(tc.volumeID), + Attachments: []*ec2.VolumeAttachment{{State: aws.String("attached")}}, + } + ctx := context.Background() + mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes() mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil) mockEC2.EXPECT().AttachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr) @@ -248,7 +254,13 @@ func TestDetachDisk(t *testing.T) { mockEC2 := mocks.NewMockEC2(mockCtrl) c := newCloud(mockEC2) + vol := &ec2.Volume{ + VolumeId: aws.String(tc.volumeID), + Attachments: nil, + } + ctx := context.Background() + mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, nil).AnyTimes() mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil) mockEC2.EXPECT().DetachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr) From f058f34e3bba74a342101c804c3da52844fae4e8 Mon Sep 17 00:00:00 2001 From: Fabio Bertinatto Date: Thu, 11 Oct 2018 17:09:21 +0200 Subject: [PATCH 2/2] Adjust e2e test --- Makefile | 3 +- pkg/cloud/cloud.go | 6 ++- tests/e2e/e2e_test.go | 93 +++++++++++++++++++---------------------- tests/e2e/setup_test.go | 5 +++ 4 files changed, 53 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index d95cece806..6dc52a06fb 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,8 @@ test-sanity: .PHONY: test-e2e test-e2e: - go test -v ./tests/e2e/... + go test -c ./tests/e2e/... -o bin/e2e.test && \ + sudo -E bin/e2e.test -ginkgo.v .PHONY: image image: diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 25e3713728..08885a8bac 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -461,8 +461,10 @@ func (c *cloud) waitForAttachmentState(ctx context.Context, volumeID, state stri return false, err } - if len(volume.Attachments) > 1 { - glog.Warningf("Found multiple attachments for volume %q: %v", volumeID, volume) + if len(volume.Attachments) == 0 { + if state == "detached" { + return true, nil + } } for _, a := range volume.Attachments { diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index 6bcd21d797..2537afb2da 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -27,7 +27,6 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" csi "github.com/container-storage-interface/spec/lib/go/csi/v0" - "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/util/wait" @@ -60,20 +59,21 @@ var _ = Describe("EBS CSI Driver", func() { Parameters: nil, } + logf("Creating volume with name %q", req.GetName()) resp, err := csiClient.ctrl.CreateVolume(context.Background(), req) Expect(err).To(BeNil(), "Could not create volume") volume := resp.GetVolume() Expect(volume).NotTo(BeNil(), "Expected valid volume, got nil") - waitForVolumeState(volume.Id, "available") + waitForVolume(volume.Id, 1 /* number of expected volumes */) - // Delete volume defer func() { + logf("Deleting volume %q", volume.Id) _, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id}) Expect(err).To(BeNil(), "Could not delete volume") - waitForVolumes(volume.Id, 0 /* number of expected volumes */) + waitForVolume(volume.Id, 0 /* number of expected volumes */) - // Deleting volume twice + logf("Deleting volume %q twice", volume.Id) _, err = csiClient.ctrl.DeleteVolume(context.Background(), &csi.DeleteVolumeRequest{VolumeId: volume.Id}) Expect(err).To(BeNil(), "Error when trying to delete volume twice") }() @@ -86,7 +86,7 @@ var _ = Describe("EBS CSI Driver", func() { }) func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) { - // Attach volume + logf("Attaching volume %q to node %q", volumeID, nodeID) respAttach, err := csiClient.ctrl.ControllerPublishVolume( context.Background(), &csi.ControllerPublishVolumeRequest{ @@ -96,10 +96,10 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) }, ) Expect(err).To(BeNil(), "ControllerPublishVolume failed attaching volume %q to node %q", volumeID, nodeID) - waitForVolumeState(volumeID, "in-use") + assertAttachmentState(volumeID, "attached") - // Detach Volume defer func() { + logf("Detaching volume %q from node %q", volumeID, nodeID) _, err = csiClient.ctrl.ControllerUnpublishVolume( context.Background(), &csi.ControllerUnpublishVolumeRequest{ @@ -108,12 +108,12 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) }, ) Expect(err).To(BeNil(), "ControllerUnpublishVolume failed with error") - waitForVolumeState(volumeID, "available") + assertAttachmentState(volumeID, "detached") }() - // Stage Disk volDir := filepath.Join("/tmp/", volName) stageDir := filepath.Join(volDir, "stage") + logf("Staging volume %q to path %q", volumeID, stageDir) _, err = csiClient.node.NodeStageVolume( context.Background(), &csi.NodeStageVolumeRequest{ @@ -125,15 +125,15 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) Expect(err).To(BeNil(), "NodeStageVolume failed with error") defer func() { - // Unstage Disk + logf("Unstaging volume %q from path %q", volumeID, stageDir) _, err := csiClient.node.NodeUnstageVolume(context.Background(), &csi.NodeUnstageVolumeRequest{VolumeId: volumeID, StagingTargetPath: stageDir}) Expect(err).To(BeNil(), "NodeUnstageVolume failed with error") err = os.RemoveAll(volDir) Expect(err).To(BeNil(), "Failed to remove temp directory") }() - // Mount Disk publishDir := filepath.Join("/tmp/", volName, "mount") + logf("Publishing volume %q to path %q", volumeID, publishDir) _, err = csiClient.node.NodePublishVolume(context.Background(), &csi.NodePublishVolumeRequest{ VolumeId: volumeID, StagingTargetPath: stageDir, @@ -142,8 +142,8 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) }) Expect(err).To(BeNil(), "NodePublishVolume failed with error") - // Unmount Disk defer func() { + logf("Unpublishing volume %q from path %q", volumeID, publishDir) _, err = csiClient.node.NodeUnpublishVolume(context.Background(), &csi.NodeUnpublishVolumeRequest{ VolumeId: volumeID, TargetPath: publishDir, @@ -152,6 +152,7 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) }() if !readOnly { + logf("Writing and reading a file") // Write a file testFileContents := []byte("sample content") testFile := filepath.Join(publishDir, "testfile") @@ -164,50 +165,14 @@ func testAttachWriteReadDetach(volumeID, volName, nodeID string, readOnly bool) } } -func waitForVolumeState(volumeID, state string) { - // Most attach/detach operations on AWS finish within 1-4 seconds. - // By using 1 second starting interval with a backoff of 1.8, - // we get [1, 1.8, 3.24, 5.832000000000001, 10.4976]. - // In total we wait for 2601 seconds. - backoff := wait.Backoff{ - Duration: 1 * time.Second, - Factor: 1.8, - Steps: 13, - } - verifyVolumeFunc := func() (bool, error) { - params := &ec2.DescribeVolumesInput{ - VolumeIds: []*string{aws.String(volumeID)}, - } - volumes, err := describeVolumes(params) - if err != nil { - return false, err - } - if len(volumes) != 1 { - return false, fmt.Errorf("expected 1 volume, got %d", len(volumes)) - } - if aws.StringValue(volumes[0].State) != state { - return false, nil - } - // We need to check the atachment state when the volume is "in-use", - // as it might still be "attaching" rather than "attached". - if state == "in-use" { - if aws.StringValue(volumes[0].Attachments[0].State) != "attached" { - return false, nil - } - } - return true, nil - } - waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc) - Expect(waitErr).To(BeNil(), "Timeout error waiting for volume state %q: %v", waitErr, state) -} - -func waitForVolumes(volumeID string, nVolumes int) { +func waitForVolume(volumeID string, nVolumes int) { backoff := wait.Backoff{ Duration: 1 * time.Second, Factor: 1.8, Steps: 13, } verifyVolumeFunc := func() (bool, error) { + logf("Waiting for %d volumes with ID %q", nVolumes, volumeID) params := &ec2.DescribeVolumesInput{ VolumeIds: []*string{aws.String(volumeID)}, } @@ -225,12 +190,38 @@ func waitForVolumes(volumeID string, nVolumes int) { if len(volumes) != nVolumes { return false, nil } + if nVolumes == 1 { + if aws.StringValue(volumes[0].State) != "available" { + return false, nil + } + } return true, nil } waitErr := wait.ExponentialBackoff(backoff, verifyVolumeFunc) Expect(waitErr).To(BeNil(), "Timeout error when looking for volume %q: %v", volumeID, waitErr) } +func assertAttachmentState(volumeID, state string) { + logf("Checking if attachment state of volume %q is %q", volumeID, state) + volumes, err := describeVolumes(&ec2.DescribeVolumesInput{ + VolumeIds: []*string{aws.String(volumeID)}, + }) + Expect(err).To(BeNil(), "Error describing volumes: %v", err) + + nVolumes := len(volumes) + Expect(nVolumes).To(BeNumerically("==", 1), "Expected 1 volume, got %d", nVolumes) + + // Detached volumes have 0 attachments + if state == "detached" { + nAttachments := len(volumes[0].Attachments) + Expect(nAttachments).To(BeNumerically("==", 0), "Expected 0 attachments, got %d", nAttachments) + return + } + + aState := aws.StringValue(volumes[0].Attachments[0].State) + Expect(aState).To(Equal(state), "Expected state %s, got %s", state, aState) +} + func describeVolumes(params *ec2.DescribeVolumesInput) ([]*ec2.Volume, error) { var volumes []*ec2.Volume var nextToken *string diff --git a/tests/e2e/setup_test.go b/tests/e2e/setup_test.go index ab826cd1c6..43c9bfa2b2 100644 --- a/tests/e2e/setup_test.go +++ b/tests/e2e/setup_test.go @@ -16,6 +16,7 @@ package e2e import ( "flag" + "fmt" "net" "testing" "time" @@ -106,3 +107,7 @@ func newEC2Client() *ec2.EC2 { })) return ec2.New(sess) } + +func logf(format string, args ...interface{}) { + fmt.Fprintln(GinkgoWriter, fmt.Sprintf(format, args...)) +}