Skip to content

Commit

Permalink
SDK will now return a volume only if it is ready
Browse files Browse the repository at this point in the history
With this fix, any call creating a volume and noticing it already
exists now checks to make sure that the volume is UP and ready
before returning.

Closes #1180

Signed-off-by: Luis Pabón <[email protected]>
  • Loading branch information
lpabon committed Aug 27, 2019
1 parent fc6a1bb commit 24f1365
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/server/sdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTestServer(t *testing.T) *testServer {
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
79 changes: 78 additions & 1 deletion api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sdk
import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

Expand All @@ -32,6 +33,68 @@ import (
"google.golang.org/grpc/status"
)

// When create is called for an existing volume, this function is called to make sure
// the SDK only returns that the volume is ready when the status is UP
func (s *VolumeServer) waitForVolumeReady(ctx context.Context, id string) (*api.Volume, error) {
var v *api.Volume

minTimeout := 1 * time.Second
maxTimeout := 60 * time.Minute
defaultTimeout := 10 * time.Minute

logrus.Infof("Waiting for volume %s to become available", id)

e := util.WaitForWithContext(
ctx,
minTimeout, maxTimeout, defaultTimeout, // timeouts
250*time.Millisecond, // period
func() (bool, error) {
var err error
// Get the latest status from the volume
v, err = util.VolumeFromName(s.driver(ctx), id)
if err != nil {
return false, status.Errorf(codes.Internal, err.Error())
}

// Check if the volume is ready
if v.GetStatus() == api.VolumeStatus_VOLUME_STATUS_UP {
return false, nil
}

// Continue waiting
return true, nil
})

return v, e
}

func (s *VolumeServer) waitForVolumeRemoved(ctx context.Context, id string) error {
var v *api.Volume

minTimeout := 1 * time.Second
maxTimeout := 10 * time.Minute
defaultTimeout := 5 * time.Minute

logrus.Infof("Waiting for volume %s to be removed", id)

return util.WaitForWithContext(
ctx,
minTimeout, maxTimeout, defaultTimeout, // timeouts
250*time.Millisecond, // period
func() (bool, error) {
var err error
// Get the latest status from the volume
v, err = util.VolumeFromName(s.driver(ctx), id)
if err != nil {
// Removed
return false, nil
}

// Continue waiting
return true, nil
})
}

func (s *VolumeServer) create(
ctx context.Context,
locator *api.VolumeLocator,
Expand All @@ -42,12 +105,26 @@ func (s *VolumeServer) create(
// Check if the volume has already been created or is in process of creation
volName := locator.GetName()
v, err := util.VolumeFromName(s.driver(ctx), volName)
if err == nil {

// If the volume is still there but it is being delete, then wait until it is removed
if err == nil && v.GetState() == api.VolumeState_VOLUME_STATE_DELETED {
if err = s.waitForVolumeRemoved(ctx, volName); err != nil {
return "", err
}

// If the volume is there but it is not being deleted then just return the current id
} else if err == nil {
// Check ownership
if !v.IsPermitted(ctx, api.Ownership_Admin) {
return "", status.Errorf(codes.PermissionDenied, "Volume %s already exists and is owned by another user", volName)
}

// Wait until ready
v, err = s.waitForVolumeReady(ctx, volName)
if err != nil {
return "", err
}

// Check the requested arguments match that of the existing volume
if v.GetSpec().GetSize() != spec.GetSize() {
return "", status.Errorf(
Expand Down
150 changes: 136 additions & 14 deletions api/server/sdk/volume_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {
policy.Init(kv)
}

func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
func TestSdkVolumeCreateCheckIdempotencyWaitForRemoved(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
Expand All @@ -59,24 +59,51 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
},
}

// Create response
id := "myid"
vol := &api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_UP,
State: api.VolumeState_VOLUME_STATE_DELETED,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
}

gomock.InOrder(
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{
&api.Volume{
Id: id,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
},
}, nil).
Times(1),
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return(nil, fmt.Errorf("MOCK ERROR")),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: name}, nil).
Return(nil, fmt.Errorf("not found")),

s.MockDriver().
EXPECT().
Create(&api.VolumeLocator{
Name: name,
}, &api.Source{}, &api.VolumeSpec{Size: size}).
Return(id, nil),
)

// Setup client
Expand All @@ -88,6 +115,101 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
assert.Equal(t, r.GetVolumeId(), "myid")
}

func TestSdkVolumeCreateCheckIdempotencyWaitForReady(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
defer s.Stop()

name := "myvol"
size := uint64(1234)
req := &api.SdkVolumeCreateRequest{
Name: name,
Spec: &api.VolumeSpec{
Size: size,
},
}

id := "myid"
count := 0
vol := &api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_DEGRADED,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
}

// This should be called 3 times.
// 1 for the first inspect to check if it is there.
// 3 for waiting until the status is VOLUME UP
// 1 for waiting but getting that the volume is up
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Do(func([]string) {
count++
if count == 4 {
vol.Status = api.VolumeStatus_VOLUME_STATUS_UP
}
}).
Return([]*api.Volume{vol}, nil).
Times(4)

// Setup client
c := api.NewOpenStorageVolumeClient(s.Conn())

// Get info
_, err := c.Create(context.Background(), req)
assert.NoError(t, err)
}

func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
defer s.Stop()

name := "myvol"
size := uint64(1234)
req := &api.SdkVolumeCreateRequest{
Name: name,
Spec: &api.VolumeSpec{
Size: size,
},
}

// Create response
id := "myid"
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{
&api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
},
}, nil).
Times(2)

// Setup client
c := api.NewOpenStorageVolumeClient(s.Conn())

// Get info
r, err := c.Create(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, r.GetVolumeId(), "myid")
}

func TestSdkVolumeCreate(t *testing.T) {

// Create server and client connection
Expand Down
2 changes: 1 addition & 1 deletion api/server/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newTestServer(t *testing.T) *testServer {
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
49 changes: 49 additions & 0 deletions csi/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,30 @@ func TestControllerCreateVolumeFoundByVolumeFromNameConflict(t *testing.T) {
Locator: &api.VolumeLocator{
Name: "size",
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{

// Size is different
Size: 10,
},
}}, nil).
Times(1),

s.MockDriver().
EXPECT().
Inspect([]string{"size"}).
Return(nil, fmt.Errorf("not found")).
Times(1),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: "size"}, nil).
Return([]*api.Volume{&api.Volume{
Id: "size",
Locator: &api.VolumeLocator{
Name: "size",
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{

// Size is different
Expand Down Expand Up @@ -991,6 +1015,30 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) {
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(size),
},
},
}, nil).
Times(1),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return(nil, fmt.Errorf("not found")).
Times(1),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: name}, nil).
Return([]*api.Volume{
&api.Volume{
Id: name,
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(size),
},
Expand All @@ -1008,6 +1056,7 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) {
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(1234),
},
Expand Down
2 changes: 1 addition & 1 deletion csi/csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func newTestServerWithConfig(t *testing.T, config *OsdCsiServerConfig) *testServ
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
Loading

0 comments on commit 24f1365

Please sign in to comment.