diff --git a/docs/multi-attach.md b/docs/multi-attach.md new file mode 100644 index 0000000000..c3a7a2e020 --- /dev/null +++ b/docs/multi-attach.md @@ -0,0 +1,81 @@ +# Multi-Attach + +The multi-attach capability allows you to attach a single EBS volume to multiple EC2 instances located within the same Availability Zone (AZ). This shared volume can be utilized by several pods running on distinct nodes. + +Multi-attach is enabled by specifying `ReadWriteMany` for the `PersistentVolumeClaim.spec.accessMode`. + +## Important + +- Application-level coordination (e.g., via I/O fencing) is required to use multi-attach safely. Failure to do so can result in data loss and silent data corruption. Refer to the AWS documentation on Multi-Attach for more information. +- Currently, the EBS CSI driver only supports multi-attach for `IO2` volumes in `Block` mode. + +Refer to the official AWS documentation on [Multi-Attach](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-volumes-multi.html) for more information, best practices, and limitations of this capability. + +## Example + +1. Create a `StorageClass` referencing an `IO2` volume type: +``` +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: ebs-sc +provisioner: ebs.csi.aws.com +volumeBindingMode: WaitForFirstConsumer +parameters: + type: io2 + iops: "1000" +``` + +2. Create a `PersistentVolumeClaim` referencing the `ReadWriteMany` access and `Block` device modes: +``` +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: block-claim +spec: + accessModes: + - ReadWriteMany + volumeMode: Block + storageClassName: ebs-sc + resources: + requests: + storage: 4Gi +``` + +3. Create a `DaemonSet` to deploy the driver on all nodes: +``` +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: app-daemon +spec: + selector: + matchLabels: + name: app + template: + metadata: + labels: + name: app + spec: + containers: + - name: app + image: busybox + command: ["/bin/sh", "-c"] + args: ["tail -f /dev/null"] + volumeDevices: + - name: data + devicePath: /dev/xvda + volumes: + - name: data + persistentVolumeClaim: + claimName: block-claim +``` + +4. Verify the `DaemonSet` is running: +``` +$ kubectl get pods -A + +NAMESPACE NAME READY STATUS RESTARTS AGE +default app-daemon-9hdgw 1/1 Running 0 18s +default app-daemon-xm8zr 1/1 Running 0 18s +``` diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 9dbd0ef9a5..6fd9134e43 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -195,6 +195,7 @@ type DiskOptions struct { OutpostArn string Encrypted bool BlockExpress bool + MultiAttachEnabled bool // KmsKeyID represents a fully qualified resource name to the key to use for encryption. // example: arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef KmsKeyID string @@ -345,6 +346,10 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * return nil, fmt.Errorf("invalid AWS VolumeType %q", diskOptions.VolumeType) } + if diskOptions.MultiAttachEnabled && createType != VolumeTypeIO2 { + return nil, fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes") + } + if maxIops > 0 { if diskOptions.IOPS > 0 { requestedIops = int64(diskOptions.IOPS) @@ -381,11 +386,12 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * clientToken := sha256.Sum256([]byte(volumeName)) requestInput := &ec2.CreateVolumeInput{ - AvailabilityZone: aws.String(zone), - ClientToken: aws.String(hex.EncodeToString(clientToken[:])), - Size: aws.Int64(capacityGiB), - VolumeType: aws.String(createType), - Encrypted: aws.Bool(diskOptions.Encrypted), + AvailabilityZone: aws.String(zone), + ClientToken: aws.String(hex.EncodeToString(clientToken[:])), + Size: aws.Int64(capacityGiB), + VolumeType: aws.String(createType), + Encrypted: aws.Bool(diskOptions.Encrypted), + MultiAttachEnabled: aws.Bool(diskOptions.MultiAttachEnabled), } if !util.IsSBE(zone) { @@ -549,18 +555,12 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string resp, attachErr := c.ec2.AttachVolumeWithContext(ctx, request) if attachErr != nil { - var awsErr awserr.Error - if errors.As(attachErr, &awsErr) { - if awsErr.Code() == "VolumeInUse" { - return "", ErrVolumeInUse - } - } return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr) } klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp) } - attachment, err := c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned) + _, err = c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned) // This is the only situation where we taint the device if err != nil { @@ -568,21 +568,6 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string return "", err } - // Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint - // It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call, - // which could theoretically be against a different device (or even instance). - if attachment == nil { - // Impossible? - return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", volumeID, nodeID) - } - if device.Path != aws.StringValue(attachment.Device) { - // Already checked in waitForAttachmentState(), but just to be sure... - return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", volumeID, nodeID, device.Path, aws.StringValue(attachment.Device)) - } - if *instance.InstanceId != aws.StringValue(attachment.InstanceId) { - return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", volumeID, nodeID, *instance.InstanceId, aws.StringValue(attachment.InstanceId)) - } - // TODO: Check volume capability matches for ALREADY_EXISTS // This could happen when request volume already attached to request node, // but is incompatible with the specified volume_capability or readonly flag @@ -674,41 +659,12 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt return false, nil } - if len(volume.Attachments) > 1 { - // Shouldn't happen; log so we know if it is - klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume) - } attachmentState := "" + for _, a := range volume.Attachments { - if attachmentState != "" { - // Shouldn't happen; log so we know if it is - klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume) - } - if a.State != nil { + if a.State != nil && aws.StringValue(a.InstanceId) == expectedInstance && aws.StringValue(a.Device) == expectedDevice { + attachmentState = aws.StringValue(a.State) attachment = a - attachmentState = *a.State - } else { - // Shouldn't happen; log so we know if it is - klog.InfoS("Ignoring nil attachment state for volume", "volumeID", volumeID, "attachment", a) - } - } - if attachmentState == "" { - attachmentState = volumeDetachedState - } - if attachment != nil { - // AWS eventual consistency can go back in time. - // For example, we're waiting for a volume to be attached as /dev/xvdba, but AWS can tell us it's - // attached as /dev/xvdbb, where it was attached before and it was already detached. - // Retry couple of times, hoping AWS starts reporting the right status. - device := aws.StringValue(attachment.Device) - if expectedDevice != "" && device != "" && device != expectedDevice { - klog.InfoS("Expected device for volume not found", "expectedDevice", expectedDevice, "expectedState", expectedState, "volumeID", volumeID, "device", device, "attachmentState", attachmentState) - return false, nil - } - instanceID := aws.StringValue(attachment.InstanceId) - if expectedInstance != "" && instanceID != "" && instanceID != expectedInstance { - klog.InfoS("Expected instance for volume not found", "expectedInstance", expectedInstance, "expectedState", expectedState, "volumeID", volumeID, "instanceID", instanceID, "attachmentState", attachmentState) - return false, nil } } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 8e4b89f7b7..aeae3df0a2 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -575,6 +575,43 @@ func TestCreateDisk(t *testing.T) { }, expErr: nil, }, + { + name: "success: multi-attach with IO2", + volumeName: "vol-test-name", + diskOptions: &DiskOptions{ + CapacityBytes: util.GiBToBytes(4), + Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"}, + VolumeType: VolumeTypeIO2, + MultiAttachEnabled: true, + IOPSPerGB: 10000, + }, + expDisk: &Disk{ + VolumeID: "vol-test", + CapacityGiB: 4, + AvailabilityZone: defaultZone, + }, + expCreateVolumeInput: &ec2.CreateVolumeInput{ + Iops: aws.Int64(2000), + }, + expErr: nil, + }, + { + name: "failure: multi-attach with GP3", + volumeName: "vol-test-name", + diskOptions: &DiskOptions{ + CapacityBytes: util.GiBToBytes(4), + Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"}, + VolumeType: VolumeTypeGP3, + MultiAttachEnabled: true, + IOPSPerGB: 10000, + }, + expDisk: &Disk{ + VolumeID: "vol-test", + CapacityGiB: 4, + AvailabilityZone: defaultZone, + }, + expErr: fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes"), + }, } for _, tc := range testCases { @@ -715,9 +752,10 @@ func TestAttachDisk(t *testing.T) { name string volumeID string nodeID string + nodeID2 string path string expErr error - mockFunc func(*MockEC2API, context.Context, string, string, string, dm.DeviceManager) + mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager) }{ { name: "success: AttachVolume normal", @@ -725,7 +763,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: nil, - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { volumeRequest := createVolumeRequest(volumeID) instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) @@ -743,7 +781,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: nil, - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { volumeRequest := createVolumeRequest(volumeID) instanceRequest := createInstanceRequest(nodeID) @@ -762,7 +800,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, errors.New("AttachVolume error")), - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) @@ -778,7 +816,7 @@ func TestAttachDisk(t *testing.T) { nodeID: defaultNodeID, path: defaultPath, expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, ErrVolumeInUse), - mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) { + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { instanceRequest := createInstanceRequest(nodeID) attachRequest := createAttachRequest(volumeID, nodeID, path) @@ -788,6 +826,52 @@ func TestAttachDisk(t *testing.T) { ) }, }, + { + name: "success: AttachVolume multi-attach", + volumeID: defaultVolumeID, + nodeID: defaultNodeID, + nodeID2: "node-1239", + path: defaultPath, + expErr: nil, + mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) { + volumeRequest := createVolumeRequest(volumeID) + instanceRequest := createInstanceRequest(nodeID) + attachRequest := createAttachRequest(volumeID, nodeID, path) + + createInstanceRequest2 := createInstanceRequest(nodeID2) + attachRequest2 := createAttachRequest(volumeID, nodeID2, path) + + dvOutput := &ec2.DescribeVolumesOutput{ + Volumes: []*ec2.Volume{ + { + VolumeId: aws.String(volumeID), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String(path), + InstanceId: aws.String(nodeID), + State: aws.String("attached"), + }, + { + Device: aws.String(path), + InstanceId: aws.String(nodeID2), + State: aws.String("attached"), + }, + }, + }, + }, + } + + gomock.InOrder( + mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path, "attached"), nil), + mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(createDescribeVolumesOutput(volumeID, nodeID, path, "attached"), nil), + + mockEC2.EXPECT().DescribeInstancesWithContext(ctx, createInstanceRequest2).Return(newDescribeInstancesOutput(nodeID2), nil), + mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest2).Return(createAttachVolumeOutput(volumeID, nodeID2, path, "attached"), nil), + mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(dvOutput, nil), + ) + }, + }, } for _, tc := range testCases { @@ -799,7 +883,7 @@ func TestAttachDisk(t *testing.T) { ctx := context.Background() dm := c.(*cloud).dm - tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.path, dm) + tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.nodeID2, tc.path, dm) devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID) @@ -811,6 +895,12 @@ func TestAttachDisk(t *testing.T) { assert.Equal(t, tc.path, devicePath) } + if tc.nodeID2 != "" { + devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID2) + assert.NoError(t, err) + assert.Equal(t, tc.path, devicePath) + } + mockCtrl.Finish() }) } diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index cff2eb35a2..f95a70bd77 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -37,16 +37,13 @@ import ( "k8s.io/klog/v2" ) -var ( - // volumeCaps represents how the volume could be accessed. - // It is SINGLE_NODE_WRITER since EBS volume could only be - // attached to a single node at any given time. - volumeCaps = []csi.VolumeCapability_AccessMode{ - { - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - } +// Supported access modes +const ( + SingleNodeWriter = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + MultiNodeMultiWriter = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER +) +var ( // controllerCaps represents the capability of controller service controllerCaps = []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, @@ -115,6 +112,15 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, err } volName := req.GetName() + volCap := req.GetVolumeCapabilities() + + multiAttach := false + for _, c := range volCap { + if c.GetAccessMode().GetMode() == MultiNodeMultiWriter && isBlock(c) { + klog.V(4).InfoS("CreateVolume: multi-attach is enabled", "volumeID", volName) + multiAttach = true + } + } // check if a request is already in-flight if ok := d.inFlight.Insert(volName); !ok { @@ -231,37 +237,37 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol if len(blockSize) > 0 { responseCtx[BlockSizeKey] = blockSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), BlockSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, BlockSizeKey, FileSystemConfigs); err != nil { return nil, err } } if len(inodeSize) > 0 { responseCtx[InodeSizeKey] = inodeSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), InodeSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, InodeSizeKey, FileSystemConfigs); err != nil { return nil, err } } if len(bytesPerInode) > 0 { responseCtx[BytesPerInodeKey] = bytesPerInode - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), BytesPerInodeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, BytesPerInodeKey, FileSystemConfigs); err != nil { return nil, err } } if len(numberOfInodes) > 0 { responseCtx[NumberOfInodesKey] = numberOfInodes - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), NumberOfInodesKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, NumberOfInodesKey, FileSystemConfigs); err != nil { return nil, err } } if ext4BigAlloc { responseCtx[Ext4BigAllocKey] = "true" - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), Ext4BigAllocKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, Ext4BigAllocKey, FileSystemConfigs); err != nil { return nil, err } } if len(ext4ClusterSize) > 0 { responseCtx[Ext4ClusterSizeKey] = ext4ClusterSize - if err = validateVolumeCapabilities(req.GetVolumeCapabilities(), Ext4ClusterSizeKey, FileSystemConfigs); err != nil { + if err = validateFormattingOption(volCap, Ext4ClusterSizeKey, FileSystemConfigs); err != nil { return nil, err } } @@ -329,6 +335,7 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol BlockExpress: blockExpress, KmsKeyID: kmsKeyID, SnapshotID: snapshotID, + MultiAttachEnabled: multiAttach, } disk, err := d.cloud.CreateDisk(ctx, volName, opts) @@ -357,10 +364,7 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { } if !isValidVolumeCapabilities(volCaps) { - modes := util.GetAccessModes(volCaps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return status.Error(codes.InvalidArgument, errString) + return status.Error(codes.InvalidArgument, "Volume capabilities not supported") } return nil } @@ -407,17 +411,14 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if !d.inFlight.Insert(volumeID) { + if !d.inFlight.Insert(volumeID + nodeID) { return nil, status.Error(codes.Aborted, fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID)) } - defer d.inFlight.Delete(volumeID) + defer d.inFlight.Delete(volumeID + nodeID) klog.V(2).InfoS("ControllerPublishVolume: attaching", "volumeID", volumeID, "nodeID", nodeID) devicePath, err := d.cloud.AttachDisk(ctx, volumeID, nodeID) if err != nil { - if errors.Is(err, cloud.ErrVolumeInUse) { - return nil, status.Errorf(codes.FailedPrecondition, "Volume %q is already attached to a different node, expected node: %q", volumeID, nodeID) - } return nil, status.Errorf(codes.Internal, "Could not attach volume %q to node %q: %v", volumeID, nodeID, err) } klog.V(2).InfoS("ControllerPublishVolume: attached", "volumeID", volumeID, "nodeID", nodeID, "devicePath", devicePath) @@ -440,12 +441,8 @@ func validateControllerPublishVolumeRequest(req *csi.ControllerPublishVolumeRequ return status.Error(codes.InvalidArgument, "Volume capability not provided") } - caps := []*csi.VolumeCapability{volCap} - if !isValidVolumeCapabilities(caps) { - modes := util.GetAccessModes(caps) - stringModes := strings.Join(*modes, ", ") - errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported." - return status.Error(codes.InvalidArgument, errString) + if !isValidCapability(volCap) { + return status.Error(codes.InvalidArgument, "Volume capability not supported") } return nil } @@ -459,10 +456,10 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req * volumeID := req.GetVolumeId() nodeID := req.GetNodeId() - if !d.inFlight.Insert(volumeID) { + if !d.inFlight.Insert((volumeID + nodeID)) { return nil, status.Error(codes.Aborted, fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID)) } - defer d.inFlight.Delete(volumeID) + defer d.inFlight.Delete((volumeID + nodeID)) klog.V(2).InfoS("ControllerUnpublishVolume: detaching", "volumeID", volumeID, "nodeID", nodeID) if err := d.cloud.DetachDisk(ctx, volumeID, nodeID); err != nil { @@ -601,23 +598,40 @@ func (d *controllerService) ControllerGetVolume(ctx context.Context, req *csi.Co return nil, status.Error(codes.Unimplemented, "") } -func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool { - hasSupport := func(cap *csi.VolumeCapability) bool { - for _, c := range volumeCaps { - if c.GetMode() == cap.AccessMode.GetMode() { - return true - } +func isValidVolumeCapabilities(v []*csi.VolumeCapability) bool { + for _, c := range v { + if !isValidCapability(c) { + return false } - return false } + return true +} + +func isValidCapability(c *csi.VolumeCapability) bool { + accessMode := c.GetAccessMode().GetMode() + + //nolint:exhaustive + switch accessMode { + case SingleNodeWriter: + return true - foundAll := true - for _, c := range volCaps { - if !hasSupport(c) { - foundAll = false + case MultiNodeMultiWriter: + if isBlock(c) { + return true + } else { + klog.InfoS("isValidCapability: access mode is only supported for block devices", "accessMode", accessMode) + return false } + + default: + klog.InfoS("isValidCapability: access mode is not supported", "accessMode", accessMode) + return false } - return foundAll +} + +func isBlock(cap *csi.VolumeCapability) bool { + _, isBlock := cap.GetAccessType().(*csi.VolumeCapability_Block) + return isBlock } func isValidVolumeContext(volContext map[string]string) bool { @@ -1006,7 +1020,7 @@ func BuildOutpostArn(segments map[string]string) string { ) } -func validateVolumeCapabilities(volumeCapabilities []*csi.VolumeCapability, paramName string, fsConfigs map[string]fileSystemConfig) error { +func validateFormattingOption(volumeCapabilities []*csi.VolumeCapability, paramName string, fsConfigs map[string]fileSystemConfig) error { for _, volCap := range volumeCapabilities { switch volCap.GetAccessType().(type) { case *csi.VolumeCapability_Block: diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index dbf113e4ad..51487e6359 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -165,6 +165,26 @@ func TestCreateVolume(t *testing.T) { }, }, } + multiAttachVolCap := []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + } + invalidMultiAttachVolCap := []*csi.VolumeCapability{ + { + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }, + }, + } stdVolSize := int64(5 * 1024 * 1024 * 1024) stdCapRange := &csi.CapacityRange{RequiredBytes: stdVolSize} stdParams := map[string]string{} @@ -1634,8 +1654,81 @@ func TestCreateVolume(t *testing.T) { checkExpectedErrorCode(t, err, codes.AlreadyExists) }, }, - } + { + name: "success multi-attach", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: multiAttachVolCap, + Parameters: nil, + } + + ctx := context.Background() + + mockDisk := &cloud.Disk{ + VolumeID: req.Name, + AvailabilityZone: expZone, + CapacityGiB: util.BytesToGiB(stdVolSize), + } + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Any()).Return(mockDisk, nil) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } + + if _, err := awsDriver.CreateVolume(ctx, req); err != nil { + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + t.Fatalf("Unexpected error: %v", srvErr.Code()) + } + }, + }, + { + name: "fail multi-attach - invalid mount capability", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: invalidMultiAttachVolCap, + } + + ctx := context.Background() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := cloud.NewMockCloud(mockCtl) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } + _, err := awsDriver.CreateVolume(ctx, req) + if err == nil { + t.Fatalf("Expected CreateVolume to fail but got no error") + } + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + if srvErr.Code() != codes.InvalidArgument { + t.Fatalf("Expect InvalidArgument but got: %s", srvErr.Code()) + } + }, + }, + } for _, tc := range testCases { t.Run(tc.name, tc.testFunc) } @@ -3327,16 +3420,6 @@ func TestControllerPublishVolume(t *testing.T) { }, errorCode: codes.Internal, }, - { - name: "Fail when volume is already attached to another node", - volumeId: "vol-test", - nodeId: expInstanceID, - volumeCapability: stdVolCap, - mockAttach: func(mockCloud *cloud.MockCloud, ctx context.Context, volumeId string, nodeId string) { - mockCloud.EXPECT().AttachDisk(gomock.Eq(ctx), gomock.Eq(volumeId), gomock.Eq(expInstanceID)).Return("", (cloud.ErrVolumeInUse)) - }, - errorCode: codes.FailedPrecondition, - }, { name: "Aborted error when AttachDisk operation already in-flight", volumeId: "vol-test", @@ -3346,7 +3429,7 @@ func TestControllerPublishVolume(t *testing.T) { }, errorCode: codes.Aborted, setupFunc: func(controllerService *controllerService) { - controllerService.inFlight.Insert("vol-test") + controllerService.inFlight.Insert("vol-test" + expInstanceID) }, }, } @@ -3442,7 +3525,7 @@ func TestControllerUnpublishVolume(t *testing.T) { nodeId: expInstanceID, errorCode: codes.Aborted, setupFunc: func(driver *controllerService) { - driver.inFlight.Insert("vol-test") + driver.inFlight.Insert("vol-test" + expInstanceID) }, }, } diff --git a/tests/e2e/multi_attach.go b/tests/e2e/multi_attach.go new file mode 100644 index 0000000000..2f25aebda3 --- /dev/null +++ b/tests/e2e/multi_attach.go @@ -0,0 +1,182 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + awscloud "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + ebscsidriver "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/driver" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/testsuites" + . "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + admissionapi "k8s.io/pod-security-admission/api" +) + +var _ = Describe("[ebs-csi-e2e] [single-az] [multi-attach] Multi-Attach", func() { + f := framework.NewDefaultFramework("ebs") + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + var ( + cs clientset.Interface + ns *v1.Namespace + ebsDriver driver.DynamicPVTestDriver + ) + + BeforeEach(func() { + cs = f.ClientSet + ns = f.Namespace + ebsDriver = driver.InitEbsCSIDriver() + }) + + It("should succeed multi-attach IO2 block device", func() { + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + } + test := testsuites.MultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.Block, + AccessMode: v1.ReadWriteMany, + VolumeType: awscloud.VolumeTypeIO2, + CheckForPVCPhase: false, + } + test.Run(cs, ns) + }) + + It("should fail multi-attach FileSystem VolumeMode", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + FSType: ebscsidriver.FSTypeExt4, + VolumeMode: testsuites.FileSystem, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeBindingMode: &volumeBindingMode, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeIO2, + FSType: ebscsidriver.FSTypeExt4, + VolumeMode: testsuites.FileSystem, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeIO2), + VolumeBindingMode: &volumeBindingMode, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + } + test := testsuites.MultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.FileSystem, + AccessMode: v1.ReadWriteMany, + VolumeType: awscloud.VolumeTypeIO2, + ExpectedPVCPhase: v1.ClaimPending, + CheckForPVCPhase: true, + } + test.Run(cs, ns) + }) + + It("should fail multi-attach GP3 VolumeType", func() { + volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer + pods := []testsuites.PodDetails{ + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeGP3, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeGP3), + VolumeBindingMode: &volumeBindingMode, + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + { + Volumes: []testsuites.VolumeDetails{ + { + VolumeType: awscloud.VolumeTypeGP3, + ClaimSize: driver.MinimumSizeForVolumeType(awscloud.VolumeTypeGP3), + VolumeBindingMode: &volumeBindingMode, + VolumeMode: testsuites.Block, + VolumeDevice: testsuites.VolumeDeviceDetails{ + NameGenerate: "test-block-volume-", + DevicePath: "/dev/xvda", + }, + AccessMode: v1.ReadWriteMany, + }, + }, + }, + } + test := testsuites.MultiAttachTest{ + CSIDriver: ebsDriver, + Pods: pods, + VolumeMode: testsuites.FileSystem, + AccessMode: v1.ReadWriteMany, + VolumeType: awscloud.VolumeTypeIO2, + ExpectedPVCPhase: v1.ClaimPending, + CheckForPVCPhase: true, + } + test.Run(cs, ns) + }) + +}) diff --git a/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go b/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go new file mode 100644 index 0000000000..ba743752f7 --- /dev/null +++ b/tests/e2e/testsuites/dynamically_provisioned_multi_attach_tester.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "context" + "fmt" + + awscloud "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/tests/e2e/driver" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + framework "k8s.io/kubernetes/test/e2e/framework" +) + +type MultiAttachTest struct { + CSIDriver driver.DynamicPVTestDriver + Pods []PodDetails + VolumeMode VolumeMode + AccessMode v1.PersistentVolumeAccessMode + VolumeType string + ExpectedPVCPhase v1.PersistentVolumeClaimPhase + CheckForPVCPhase bool +} + +func (t *MultiAttachTest) Run(client clientset.Interface, namespace *v1.Namespace) { + // Setup StorageClass and PVC + tpvc, cleanupFuncs := t.Pods[0].Volumes[0].SetupDynamicPersistentVolumeClaim(client, namespace, t.CSIDriver) + defer func() { + for _, cleanup := range cleanupFuncs { + cleanup() + } + }() + + if t.AccessMode == v1.ReadWriteMany { + if t.VolumeMode != Block || t.VolumeType != awscloud.VolumeTypeIO2 { + By("Polling for PVC to be in Pending state...") + ctx, cancel := context.WithTimeout(context.Background(), framework.ClaimProvisionShortTimeout) + defer cancel() + + err := wait.PollUntilContextTimeout(ctx, framework.Poll, framework.ClaimProvisionShortTimeout, true, func(ctx context.Context) (bool, error) { + pvc, err := client.CoreV1().PersistentVolumeClaims(namespace.Name).Get(ctx, tpvc.persistentVolumeClaim.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return pvc.Status.Phase == v1.ClaimPending, nil + }) + + Expect(err).NotTo(HaveOccurred(), "Failed to wait for PVC to be in Pending state") + return + } + } + + // If Block and IO2, proceed with attaching PVC to pods + for n, podDetail := range t.Pods { + tpod := NewTestPod(client, namespace, "tail -f /dev/null") + + if podDetail.Volumes[0].VolumeMode == Block { + name := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeDevice.NameGenerate, n+1) + devicePath := podDetail.Volumes[0].VolumeDevice.DevicePath + tpod.SetupRawBlockVolume(tpvc.persistentVolumeClaim, name, devicePath) + } else { + name := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeMount.NameGenerate, n+1) + mountPath := fmt.Sprintf("%s%d", podDetail.Volumes[0].VolumeMount.MountPathGenerate, n+1) + readOnly := podDetail.Volumes[0].VolumeMount.ReadOnly + tpod.SetupVolume(tpvc.persistentVolumeClaim, name, mountPath, readOnly) + } + + tpod.pod.ObjectMeta.Labels = map[string]string{"app": "my-service"} + tpod.pod.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "kubernetes.io/hostname", + WhenUnsatisfiable: v1.DoNotSchedule, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "my-service"}, + }, + }, + } + + By("deploying the pod") + tpod.Create() + defer tpod.Cleanup() + By("checking that the pods command exits with no error") + tpod.WaitForRunning() + } +} diff --git a/tests/e2e/testsuites/specs.go b/tests/e2e/testsuites/specs.go index ff068283ca..335e7c141c 100644 --- a/tests/e2e/testsuites/specs.go +++ b/tests/e2e/testsuites/specs.go @@ -41,6 +41,7 @@ type VolumeDetails struct { ReclaimPolicy *v1.PersistentVolumeReclaimPolicy AllowVolumeExpansion *bool VolumeBindingMode *storagev1.VolumeBindingMode + AccessMode v1.PersistentVolumeAccessMode AllowedTopologyValues []string VolumeMode VolumeMode VolumeMount VolumeMountDetails @@ -125,7 +126,7 @@ func (pod *PodDetails) SetupDeployment(client clientset.Interface, namespace *v1 createdStorageClass := tsc.Create() cleanupFuncs = append(cleanupFuncs, tsc.Cleanup) By("setting up the PVC") - tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass) + tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, v1.ReadWriteOnce) tpvc.Create() tpvc.WaitForBound() tpvc.ValidateProvisionedPersistentVolume() @@ -152,9 +153,9 @@ func (volume *VolumeDetails) SetupDynamicPersistentVolumeClaim(client clientset. Kind: VolumeSnapshotKind, APIGroup: &SnapshotAPIGroup, } - tpvc = NewTestPersistentVolumeClaimWithDataSource(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, dataSource) + tpvc = NewTestPersistentVolumeClaimWithDataSource(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, dataSource, volume.AccessMode) } else { - tpvc = NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass) + tpvc = NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, volume.AccessMode) } tpvc.Create() cleanupFuncs = append(cleanupFuncs, tpvc.Cleanup) @@ -174,7 +175,7 @@ func (volume *VolumeDetails) SetupPreProvisionedPersistentVolumeClaim(client cli tpv := NewTestPreProvisionedPersistentVolume(client, pv) tpv.Create() By("setting up the PVC") - tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, nil) + tpvc := NewTestPersistentVolumeClaim(client, namespace, volume.ClaimSize, volume.VolumeMode, nil, volume.AccessMode) tpvc.Create() cleanupFuncs = append(cleanupFuncs, tpvc.DeleteBoundPersistentVolume) cleanupFuncs = append(cleanupFuncs, tpvc.Cleanup) diff --git a/tests/e2e/testsuites/testsuites.go b/tests/e2e/testsuites/testsuites.go index 2d7709c7b1..e5ad0a59ef 100644 --- a/tests/e2e/testsuites/testsuites.go +++ b/tests/e2e/testsuites/testsuites.go @@ -282,6 +282,7 @@ type TestPersistentVolumeClaim struct { client clientset.Interface claimSize string volumeMode v1.PersistentVolumeMode + accessMode v1.PersistentVolumeAccessMode storageClass *storagev1.StorageClass namespace *v1.Namespace persistentVolume *v1.PersistentVolume @@ -290,7 +291,7 @@ type TestPersistentVolumeClaim struct { dataSource *v1.TypedLocalObjectReference } -func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass) *TestPersistentVolumeClaim { +func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, accessMode v1.PersistentVolumeAccessMode) *TestPersistentVolumeClaim { mode := v1.PersistentVolumeFilesystem if volumeMode == Block { mode = v1.PersistentVolumeBlock @@ -301,10 +302,11 @@ func NewTestPersistentVolumeClaim(c clientset.Interface, ns *v1.Namespace, claim volumeMode: mode, namespace: ns, storageClass: sc, + accessMode: accessMode, } } -func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, dataSource *v1.TypedLocalObjectReference) *TestPersistentVolumeClaim { +func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Namespace, claimSize string, volumeMode VolumeMode, sc *storagev1.StorageClass, dataSource *v1.TypedLocalObjectReference, accessMode v1.PersistentVolumeAccessMode) *TestPersistentVolumeClaim { mode := v1.PersistentVolumeFilesystem if volumeMode == Block { mode = v1.PersistentVolumeBlock @@ -317,6 +319,7 @@ func NewTestPersistentVolumeClaimWithDataSource(c clientset.Interface, ns *v1.Na namespace: ns, storageClass: sc, dataSource: dataSource, + accessMode: accessMode, } } @@ -328,7 +331,7 @@ func (t *TestPersistentVolumeClaim) Create() { if t.storageClass != nil { storageClassName = t.storageClass.Name } - t.requestedPersistentVolumeClaim = generatePVC(t.namespace.Name, storageClassName, t.claimSize, t.volumeMode, t.dataSource) + t.requestedPersistentVolumeClaim = generatePVC(t.namespace.Name, storageClassName, t.claimSize, t.volumeMode, t.dataSource, t.accessMode) t.persistentVolumeClaim, err = t.client.CoreV1().PersistentVolumeClaims(t.namespace.Name).Create(context.Background(), t.requestedPersistentVolumeClaim, metav1.CreateOptions{}) framework.ExpectNoError(err) } @@ -406,7 +409,10 @@ func (t *TestPersistentVolumeClaim) WaitForBound() v1.PersistentVolumeClaim { return *t.persistentVolumeClaim } -func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.PersistentVolumeMode, dataSource *v1.TypedLocalObjectReference) *v1.PersistentVolumeClaim { +func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.PersistentVolumeMode, dataSource *v1.TypedLocalObjectReference, accessMode v1.PersistentVolumeAccessMode) *v1.PersistentVolumeClaim { + if accessMode == "" { + accessMode = v1.ReadWriteOnce + } return &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "pvc-", @@ -415,7 +421,7 @@ func generatePVC(namespace, storageClassName, claimSize string, volumeMode v1.Pe Spec: v1.PersistentVolumeClaimSpec{ StorageClassName: &storageClassName, AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, + accessMode, }, Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{