Skip to content

Commit

Permalink
Refactor inFlight key to add lock per volumeId for node service
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyXiangLi committed Feb 19, 2021
1 parent 1a15fcc commit 8155fe8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 153 deletions.
4 changes: 2 additions & 2 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
}

// check if a request is already in-flight because the CreateVolume API is not idempotent
if ok := d.inFlight.Insert(req); !ok {
if ok := d.inFlight.Insert(req.String()); !ok {
msg := fmt.Sprintf("Create volume request for %s is already in progress", volName)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(req)
defer d.inFlight.Delete(req.String())

// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
Expand Down
4 changes: 2 additions & 2 deletions pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,8 +1534,8 @@ func TestCreateVolume(t *testing.T) {
mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound)

inFlight := internal.NewInFlight()
inFlight.Insert(req)
defer inFlight.Delete(req)
inFlight.Insert(req.String())
defer inFlight.Delete(req.String())

awsDriver := controllerService{
cloud: mockCloud,
Expand Down
18 changes: 9 additions & 9 deletions pkg/driver/internal/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"k8s.io/klog"
"sync"
)

Expand All @@ -29,7 +30,7 @@ type Idempotent interface {
String() string
}

// InFlight is a struct used to manage in flight requests.
// InFlight is a struct used to manage in flight requests per volumeId.
type InFlight struct {
mux *sync.Mutex
inFlight map[string]bool
Expand All @@ -43,28 +44,27 @@ func NewInFlight() *InFlight {
}
}

// Insert inserts the entry to the current list of inflight requests.
// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller .
// Returns false when the key already exists.
func (db *InFlight) Insert(entry Idempotent) bool {
func (db *InFlight) Insert(key string) bool {
db.mux.Lock()
defer db.mux.Unlock()

hash := entry.String()

_, ok := db.inFlight[hash]
_, ok := db.inFlight[key]
if ok {
return false
}

db.inFlight[hash] = true
db.inFlight[key] = true
return true
}

// Delete removes the entry from the inFlight entries map.
// It doesn't return anything, and will do nothing if the specified key doesn't exist.
func (db *InFlight) Delete(h Idempotent) {
func (db *InFlight) Delete(key string) {
db.mux.Lock()
defer db.mux.Unlock()

delete(db.inFlight, h.String())
delete(db.inFlight, key)
klog.V(4).Infof("Node Service: volume=%q operation finished", key)
}
158 changes: 26 additions & 132 deletions pkg/driver/internal/inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,14 @@ package internal

import (
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
)

type testRequest struct {
request *csi.CreateVolumeRequest
expResp bool
delete bool
volumeId string
expResp bool
delete bool
}

var stdVolCap = []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
}

var (
stdVolSize = int64(5 * util.GiB)
stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize}
stdParams = map[string]string{
"key1": "value1",
"key2": "value2",
}
)

func TestInFlight(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) {
name: "success normal",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different name",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
},
expResp: true,

volumeId: "random-vol-name",
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
name: "success adding request with different volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
volumeId: "random-vol-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "baz"},
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
},
},
{
name: "failure adding copy of request",
name: "failed adding request with same volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
volumeId: "random-vol-name-foobar",
expResp: false,
},
},
},

{
name: "success add, delete, add copy",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
delete: true,
volumeId: "random-vol-name",
expResp: false,
delete: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
},
},
Expand All @@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) {
for _, r := range tc.requests {
var resp bool
if r.delete {
db.Delete(r.request)
db.Delete(r.volumeId)
} else {
resp = db.Insert(r.request)
resp = db.Insert(r.volumeId)
}
if r.expResp != resp {
t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp)
Expand Down
35 changes: 30 additions & 5 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (

// defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5.
defaultMaxEBSNitroVolumes = 25

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
)

var (
Expand Down Expand Up @@ -141,13 +144,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}

if ok := d.inFlight.Insert(req); !ok {
msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID)
return nil, status.Error(codes.Aborted, msg)
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId())
d.inFlight.Delete(req)
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

devicePath, ok := req.PublishContext[DevicePathKey]
Expand Down Expand Up @@ -217,6 +219,14 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeUnStageVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

// Check if target directory is a mount point. GetDeviceNameFromMount
// given a mnt point, finds the device from /proc/mounts
// returns the device name, reference count, and error code
Expand Down Expand Up @@ -343,6 +353,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodePublishVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
Expand Down Expand Up @@ -373,6 +391,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeUnPublishVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target)
err := d.mounter.Unmount(target)
Expand Down
Loading

0 comments on commit 8155fe8

Please sign in to comment.