diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 34e7569c39..0b82989604 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -205,11 +205,11 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol } // check if a request is already in-flight because the CreateVolume API is not idempotent - if ok := d.inFlight.Insert(req); !ok { + if ok := d.inFlight.Insert(req.String()); !ok { msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) return nil, status.Error(codes.Aborted, msg) } - defer d.inFlight.Delete(req) + defer d.inFlight.Delete(req.String()) // create a new volume zone := pickAvailabilityZone(req.GetAccessibilityRequirements()) diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index 521e84b488..55878689c5 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -1534,8 +1534,8 @@ func TestCreateVolume(t *testing.T) { mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound) inFlight := internal.NewInFlight() - inFlight.Insert(req) - defer inFlight.Delete(req) + inFlight.Insert(req.String()) + defer inFlight.Delete(req.String()) awsDriver := controllerService{ cloud: mockCloud, diff --git a/pkg/driver/internal/inflight.go b/pkg/driver/internal/inflight.go index f987918e6e..5f0d2a9ad7 100644 --- a/pkg/driver/internal/inflight.go +++ b/pkg/driver/internal/inflight.go @@ -17,6 +17,7 @@ limitations under the License. package internal import ( + "k8s.io/klog" "sync" ) @@ -29,7 +30,7 @@ type Idempotent interface { String() string } -// InFlight is a struct used to manage in flight requests. +// InFlight is a struct used to manage in flight requests per volumeId. type InFlight struct { mux *sync.Mutex inFlight map[string]bool @@ -43,28 +44,27 @@ func NewInFlight() *InFlight { } } -// Insert inserts the entry to the current list of inflight requests. +// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller . // Returns false when the key already exists. -func (db *InFlight) Insert(entry Idempotent) bool { +func (db *InFlight) Insert(key string) bool { db.mux.Lock() defer db.mux.Unlock() - hash := entry.String() - - _, ok := db.inFlight[hash] + _, ok := db.inFlight[key] if ok { return false } - db.inFlight[hash] = true + db.inFlight[key] = true return true } // Delete removes the entry from the inFlight entries map. // It doesn't return anything, and will do nothing if the specified key doesn't exist. -func (db *InFlight) Delete(h Idempotent) { +func (db *InFlight) Delete(key string) { db.mux.Lock() defer db.mux.Unlock() - delete(db.inFlight, h.String()) + delete(db.inFlight, key) + klog.V(4).Infof("Node Service: volume=%q operation finished", key) } diff --git a/pkg/driver/internal/inflight_test.go b/pkg/driver/internal/inflight_test.go index b0d5c824d7..faaeb74c13 100644 --- a/pkg/driver/internal/inflight_test.go +++ b/pkg/driver/internal/inflight_test.go @@ -18,37 +18,14 @@ package internal import ( "testing" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" ) type testRequest struct { - request *csi.CreateVolumeRequest - expResp bool - delete bool + volumeId string + expResp bool + delete bool } -var stdVolCap = []*csi.VolumeCapability{ - { - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - }, -} - -var ( - stdVolSize = int64(5 * util.GiB) - stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize} - stdParams = map[string]string{ - "key1": "value1", - "key2": "value2", - } -) - func TestInFlight(t *testing.T) { testCases := []struct { name string @@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) { name: "success normal", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - }, - }, - { - name: "success adding request with different name", - requests: []testRequest{ - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - }, - }, - { - name: "success adding request with different parameters", - requests: []testRequest{ - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "bar"}, - }, - expResp: true, - }, - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - }, - expResp: true, + + volumeId: "random-vol-name", + expResp: true, }, }, }, { - name: "success adding request with different parameters", + name: "success adding request with different volumeId", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "bar"}, - }, - expResp: true, + volumeId: "random-vol-foobar", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "baz"}, - }, - expResp: true, + volumeId: "random-vol-name-foobar", + expResp: true, }, }, }, { - name: "failure adding copy of request", + name: "failed adding request with same volumeId", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name-foobar", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: false, + volumeId: "random-vol-name-foobar", + expResp: false, }, }, }, + { name: "success add, delete, add copy", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: false, - delete: true, + volumeId: "random-vol-name", + expResp: false, + delete: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name", + expResp: true, }, }, }, @@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) { for _, r := range tc.requests { var resp bool if r.delete { - db.Delete(r.request) + db.Delete(r.volumeId) } else { - resp = db.Insert(r.request) + resp = db.Insert(r.volumeId) } if r.expResp != resp { t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp) diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 6a88710ff1..2c59f09978 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -57,6 +57,9 @@ const ( // defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5. defaultMaxEBSNitroVolumes = 25 + + // VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID + VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress" ) var ( @@ -141,13 +144,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } } - if ok := d.inFlight.Insert(req); !ok { - msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID) - return nil, status.Error(codes.Aborted, msg) + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { - klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId()) - d.inFlight.Delete(req) + klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) }() devicePath, ok := req.PublishContext[DevicePathKey] @@ -217,6 +219,14 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Staging target not provided") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodeUnStageVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() + // Check if target directory is a mount point. GetDeviceNameFromMount // given a mnt point, finds the device from /proc/mounts // returns the device name, reference count, and error code @@ -343,6 +353,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodePublishVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() + mountOptions := []string{"bind"} if req.GetReadonly() { mountOptions = append(mountOptions, "ro") @@ -373,6 +391,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if len(target) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path not provided") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodeUnPublishVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target) err := d.mounter.Unmount(target) diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index f8114c992f..d864f34480 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -494,8 +494,8 @@ func TestNodeStageVolume(t *testing.T) { } inFlight := internal.NewInFlight() - inFlight.Insert(req) - defer inFlight.Delete(req) + inFlight.Insert(req.VolumeId) + defer inFlight.Delete(req.VolumeId) awsDriver := &nodeService{ metadata: mockMetadata, @@ -507,7 +507,6 @@ func TestNodeStageVolume(t *testing.T) { if err == nil { t.Fatalf("Expect error but got no error") } - expectErr(t, err, codes.Aborted) }, }, @@ -682,6 +681,31 @@ func TestNodeUnstageVolume(t *testing.T) { expectErr(t, err, codes.Internal) }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodeUnstageVolumeRequest{ + StagingTargetPath: targetPath, + VolumeId: "vol-test", + } + + awsDriver.inFlight.Insert("vol-test") + _, err := awsDriver.NodeUnstageVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + }, + }, } for _, tc := range testCases { @@ -1110,6 +1134,42 @@ func TestNodePublishVolume(t *testing.T) { }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodePublishVolumeRequest{ + PublishContext: map[string]string{DevicePathKey: "/dev/fake"}, + StagingTargetPath: "/test/staging/path", + TargetPath: "/test/target/path", + VolumeId: "vol-test", + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + } + awsDriver.inFlight.Insert("vol-test") + + _, err := awsDriver.NodePublishVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + + }, + }, } for _, tc := range testCases { @@ -1308,6 +1368,31 @@ func TestNodeUnpublishVolume(t *testing.T) { expectErr(t, err, codes.Internal) }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodeUnpublishVolumeRequest{ + TargetPath: targetPath, + VolumeId: "vol-test", + } + + awsDriver.inFlight.Insert("vol-test") + _, err := awsDriver.NodeUnpublishVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + }, + }, } for _, tc := range testCases {