Skip to content

Commit

Permalink
Add locks to node operations per volumeId
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyXiangLi committed Feb 9, 2021
1 parent 4790c45 commit 22b25f6
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 147 deletions.
14 changes: 7 additions & 7 deletions pkg/driver/internal/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"k8s.io/klog"
"sync"
)

Expand Down Expand Up @@ -45,26 +46,25 @@ func NewInFlight() *InFlight {

// Insert inserts the entry to the current list of inflight requests.
// Returns false when the key already exists.
func (db *InFlight) Insert(entry Idempotent) bool {
func (db *InFlight) Insert(volumeId string) bool {
db.mux.Lock()
defer db.mux.Unlock()

hash := entry.String()

_, ok := db.inFlight[hash]
_, ok := db.inFlight[volumeId]
if ok {
return false
}

db.inFlight[hash] = true
db.inFlight[volumeId] = true
return true
}

// Delete removes the entry from the inFlight entries map.
// It doesn't return anything, and will do nothing if the specified key doesn't exist.
func (db *InFlight) Delete(h Idempotent) {
func (db *InFlight) Delete(volumeId string) {
db.mux.Lock()
defer db.mux.Unlock()

delete(db.inFlight, h.String())
delete(db.inFlight, volumeId)
klog.V(4).Infof("Node Service: volume=%q operation finished", volumeId)
}
158 changes: 26 additions & 132 deletions pkg/driver/internal/inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,14 @@ package internal

import (
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
)

type testRequest struct {
request *csi.CreateVolumeRequest
expResp bool
delete bool
volumeId string
expResp bool
delete bool
}

var stdVolCap = []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
}

var (
stdVolSize = int64(5 * util.GiB)
stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize}
stdParams = map[string]string{
"key1": "value1",
"key2": "value2",
}
)

func TestInFlight(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) {
name: "success normal",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different name",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
},
expResp: true,

volumeId: "random-vol-name",
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
name: "success adding request with different volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
volumeId: "random-vol-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "baz"},
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
},
},
{
name: "failure adding copy of request",
name: "failed adding request with same volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
volumeId: "random-vol-name-foobar",
expResp: false,
},
},
},

{
name: "success add, delete, add copy",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
delete: true,
volumeId: "random-vol-name",
expResp: false,
delete: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
},
},
Expand All @@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) {
for _, r := range tc.requests {
var resp bool
if r.delete {
db.Delete(r.request)
db.Delete(r.volumeId)
} else {
resp = db.Insert(r.request)
resp = db.Insert(r.volumeId)
}
if r.expResp != resp {
t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp)
Expand Down
27 changes: 19 additions & 8 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (

// defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5.
defaultMaxEBSNitroVolumes = 25

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
)

var (
Expand Down Expand Up @@ -141,15 +144,10 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}

if ok := d.inFlight.Insert(req); !ok {
msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID)
return nil, status.Error(codes.Internal, msg)
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId())
d.inFlight.Delete(req)
klog.V(4).Info("donedone")
}()
defer d.inFlight.Delete(volumeID)

devicePath, ok := req.PublishContext[DevicePathKey]
if !ok {
Expand Down Expand Up @@ -218,6 +216,10 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer d.inFlight.Delete(volumeID)
// Check if target directory is a mount point. GetDeviceNameFromMount
// given a mnt point, finds the device from /proc/mounts
// returns the device name, reference count, and error code
Expand Down Expand Up @@ -344,6 +346,11 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer d.inFlight.Delete(volumeID)

mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
Expand Down Expand Up @@ -374,6 +381,10 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer d.inFlight.Delete(volumeID)

klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target)
err := d.mounter.Unmount(target)
Expand Down
Loading

0 comments on commit 22b25f6

Please sign in to comment.