Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor inFlight key to add lock per volumeId #702

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
}

// check if a request is already in-flight because the CreateVolume API is not idempotent
if ok := d.inFlight.Insert(req); !ok {
if ok := d.inFlight.Insert(req.String()); !ok {
msg := fmt.Sprintf("Create volume request for %s is already in progress", volName)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(req)
defer d.inFlight.Delete(req.String())

// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
Expand Down
4 changes: 2 additions & 2 deletions pkg/driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,8 +1534,8 @@ func TestCreateVolume(t *testing.T) {
mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound)

inFlight := internal.NewInFlight()
inFlight.Insert(req)
defer inFlight.Delete(req)
inFlight.Insert(req.String())
defer inFlight.Delete(req.String())

awsDriver := controllerService{
cloud: mockCloud,
Expand Down
18 changes: 9 additions & 9 deletions pkg/driver/internal/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"k8s.io/klog"
"sync"
)

Expand All @@ -29,7 +30,7 @@ type Idempotent interface {
String() string
}

// InFlight is a struct used to manage in flight requests.
// InFlight is a struct used to manage in flight requests per volumeId.
type InFlight struct {
mux *sync.Mutex
inFlight map[string]bool
Expand All @@ -43,28 +44,27 @@ func NewInFlight() *InFlight {
}
}

// Insert inserts the entry to the current list of inflight requests.
// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller .
// Returns false when the key already exists.
func (db *InFlight) Insert(entry Idempotent) bool {
func (db *InFlight) Insert(key string) bool {
db.mux.Lock()
defer db.mux.Unlock()

hash := entry.String()

_, ok := db.inFlight[hash]
_, ok := db.inFlight[key]
if ok {
return false
}

db.inFlight[hash] = true
db.inFlight[key] = true
return true
}

// Delete removes the entry from the inFlight entries map.
// It doesn't return anything, and will do nothing if the specified key doesn't exist.
func (db *InFlight) Delete(h Idempotent) {
func (db *InFlight) Delete(key string) {
db.mux.Lock()
defer db.mux.Unlock()

delete(db.inFlight, h.String())
delete(db.inFlight, key)
klog.V(4).Infof("Node Service: volume=%q operation finished", key)
}
158 changes: 26 additions & 132 deletions pkg/driver/internal/inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,14 @@ package internal

import (
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
)

type testRequest struct {
request *csi.CreateVolumeRequest
expResp bool
delete bool
volumeId string
expResp bool
delete bool
}

var stdVolCap = []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
},
}

var (
stdVolSize = int64(5 * util.GiB)
stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize}
stdParams = map[string]string{
"key1": "value1",
"key2": "value2",
}
)

func TestInFlight(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) {
name: "success normal",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different name",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
},
expResp: true,

volumeId: "random-vol-name",
expResp: true,
},
},
},
{
name: "success adding request with different parameters",
name: "success adding request with different volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "bar"},
},
expResp: true,
volumeId: "random-vol-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name-foobar",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: map[string]string{"foo": "baz"},
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
},
},
{
name: "failure adding copy of request",
name: "failed adding request with same volumeId",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name-foobar",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
volumeId: "random-vol-name-foobar",
expResp: false,
},
},
},

{
name: "success add, delete, add copy",
requests: []testRequest{
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: false,
delete: true,
volumeId: "random-vol-name",
expResp: false,
delete: true,
},
{
request: &csi.CreateVolumeRequest{
Name: "random-vol-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCap,
Parameters: stdParams,
},
expResp: true,
volumeId: "random-vol-name",
expResp: true,
},
},
},
Expand All @@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) {
for _, r := range tc.requests {
var resp bool
if r.delete {
db.Delete(r.request)
db.Delete(r.volumeId)
} else {
resp = db.Insert(r.request)
resp = db.Insert(r.volumeId)
}
if r.expResp != resp {
t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp)
Expand Down
35 changes: 30 additions & 5 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (

// defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5.
defaultMaxEBSNitroVolumes = 25

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
)

var (
Expand Down Expand Up @@ -141,13 +144,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}

if ok := d.inFlight.Insert(req); !ok {
msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID)
return nil, status.Error(codes.Aborted, msg)
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId())
d.inFlight.Delete(req)
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

devicePath, ok := req.PublishContext[DevicePathKey]
Expand Down Expand Up @@ -217,6 +219,14 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeUnStageVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

// Check if target directory is a mount point. GetDeviceNameFromMount
// given a mnt point, finds the device from /proc/mounts
// returns the device name, reference count, and error code
Expand Down Expand Up @@ -343,6 +353,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodePublishVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
Expand Down Expand Up @@ -373,6 +391,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
if ok := d.inFlight.Insert(volumeID); !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
klog.V(4).Infof("NodeUnPublishVolume: volume=%q operation finished", volumeID)
d.inFlight.Delete(volumeID)
}()

klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target)
err := d.mounter.Unmount(target)
Expand Down
Loading