Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK will now return a volume only if it is ready #1199

Merged
merged 1 commit into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/server/sdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newTestServer(t *testing.T) *testServer {
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
75 changes: 74 additions & 1 deletion api/server/sdk/volume_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sdk
import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"

Expand All @@ -32,6 +33,64 @@ import (
"google.golang.org/grpc/status"
)

// When create is called for an existing volume, this function is called to make sure
// the SDK only returns that the volume is ready when the status is UP
func (s *VolumeServer) waitForVolumeReady(ctx context.Context, id string) (*api.Volume, error) {
var v *api.Volume

minTimeout := 1 * time.Second
maxTimeout := 60 * time.Minute
defaultTimeout := 10 * time.Minute

logrus.Infof("Waiting for volume %s to become available", id)

e := util.WaitForWithContext(
ctx,
minTimeout, maxTimeout, defaultTimeout, // timeouts
250*time.Millisecond, // period
func() (bool, error) {
var err error
// Get the latest status from the volume
v, err = util.VolumeFromName(s.driver(ctx), id)
if err != nil {
return false, status.Errorf(codes.Internal, err.Error())
}

// Check if the volume is ready
if v.GetStatus() == api.VolumeStatus_VOLUME_STATUS_UP {
return false, nil
}

// Continue waiting
return true, nil
})

return v, e
}

func (s *VolumeServer) waitForVolumeRemoved(ctx context.Context, id string) error {
minTimeout := 1 * time.Second
maxTimeout := 10 * time.Minute
defaultTimeout := 5 * time.Minute

logrus.Infof("Waiting for volume %s to be removed", id)

return util.WaitForWithContext(
ctx,
minTimeout, maxTimeout, defaultTimeout, // timeouts
250*time.Millisecond, // period
func() (bool, error) {
// Get the latest status from the volume
if _, err := util.VolumeFromName(s.driver(ctx), id); err != nil {
// Removed
return false, nil
}

// Continue waiting
return true, nil
})
}

func (s *VolumeServer) create(
ctx context.Context,
locator *api.VolumeLocator,
Expand All @@ -42,12 +101,26 @@ func (s *VolumeServer) create(
// Check if the volume has already been created or is in process of creation
volName := locator.GetName()
v, err := util.VolumeFromName(s.driver(ctx), volName)
if err == nil {

// If the volume is still there but it is being delete, then wait until it is removed
if err == nil && v.GetState() == api.VolumeState_VOLUME_STATE_DELETED {
if err = s.waitForVolumeRemoved(ctx, volName); err != nil {
return "", status.Errorf(codes.Internal, "Volume with same name %s is in the process of being deleted. Timed out waiting for deletion to complete", volName)
}

// If the volume is there but it is not being deleted then just return the current id
} else if err == nil {
// Check ownership
if !v.IsPermitted(ctx, api.Ownership_Admin) {
return "", status.Errorf(codes.PermissionDenied, "Volume %s already exists and is owned by another user", volName)
}

// Wait until ready
v, err = s.waitForVolumeReady(ctx, volName)
if err != nil {
return "", status.Errorf(codes.Internal, "Timed out waiting for volume %s to be in ready state", volName)
}

// Check the requested arguments match that of the existing volume
if v.GetSpec().GetSize() != spec.GetSize() {
return "", status.Errorf(
Expand Down
150 changes: 136 additions & 14 deletions api/server/sdk/volume_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {
policy.Init(kv)
}

func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
func TestSdkVolumeCreateCheckIdempotencyWaitForRemoved(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
Expand All @@ -59,24 +59,51 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
},
}

// Create response
id := "myid"
vol := &api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_UP,
State: api.VolumeState_VOLUME_STATE_DELETED,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
}

gomock.InOrder(
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{
&api.Volume{
Id: id,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
},
}, nil).
Times(1),
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{vol}, nil),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return(nil, fmt.Errorf("MOCK ERROR")),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: name}, nil).
Return(nil, fmt.Errorf("not found")),

s.MockDriver().
EXPECT().
Create(&api.VolumeLocator{
Name: name,
}, &api.Source{}, &api.VolumeSpec{Size: size}).
Return(id, nil),
)

// Setup client
Expand All @@ -88,6 +115,101 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {
assert.Equal(t, r.GetVolumeId(), "myid")
}

func TestSdkVolumeCreateCheckIdempotencyWaitForReady(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
defer s.Stop()

name := "myvol"
size := uint64(1234)
req := &api.SdkVolumeCreateRequest{
Name: name,
Spec: &api.VolumeSpec{
Size: size,
},
}

id := "myid"
count := 0
vol := &api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_DEGRADED,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
}

// This should be called 3 times.
// 1 for the first inspect to check if it is there.
// 3 for waiting until the status is VOLUME UP
// 1 for waiting but getting that the volume is up
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Do(func([]string) {
count++
if count == 4 {
vol.Status = api.VolumeStatus_VOLUME_STATUS_UP
}
}).
Return([]*api.Volume{vol}, nil).
Times(4)

// Setup client
c := api.NewOpenStorageVolumeClient(s.Conn())

// Get info
_, err := c.Create(context.Background(), req)
assert.NoError(t, err)
}

func TestSdkVolumeCreateCheckIdempotency(t *testing.T) {

// Create server and client connection
s := newTestServer(t)
defer s.Stop()

name := "myvol"
size := uint64(1234)
req := &api.SdkVolumeCreateRequest{
Name: name,
Spec: &api.VolumeSpec{
Size: size,
},
}

// Create response
id := "myid"
s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return([]*api.Volume{
&api.Volume{
Id: id,
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Locator: &api.VolumeLocator{
Name: name,
},
Spec: &api.VolumeSpec{
Size: size,
},
},
}, nil).
Times(2)

// Setup client
c := api.NewOpenStorageVolumeClient(s.Conn())

// Get info
r, err := c.Create(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, r.GetVolumeId(), "myid")
}

func TestSdkVolumeCreate(t *testing.T) {

// Create server and client connection
Expand Down
2 changes: 1 addition & 1 deletion api/server/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newTestServer(t *testing.T) *testServer {
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
49 changes: 49 additions & 0 deletions csi/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,30 @@ func TestControllerCreateVolumeFoundByVolumeFromNameConflict(t *testing.T) {
Locator: &api.VolumeLocator{
Name: "size",
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{

// Size is different
Size: 10,
},
}}, nil).
Times(1),

s.MockDriver().
EXPECT().
Inspect([]string{"size"}).
Return(nil, fmt.Errorf("not found")).
Times(1),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: "size"}, nil).
Return([]*api.Volume{&api.Volume{
Id: "size",
Locator: &api.VolumeLocator{
Name: "size",
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{

// Size is different
Expand Down Expand Up @@ -991,6 +1015,30 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) {
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(size),
},
},
}, nil).
Times(1),

s.MockDriver().
EXPECT().
Inspect([]string{name}).
Return(nil, fmt.Errorf("not found")).
Times(1),

s.MockDriver().
EXPECT().
Enumerate(&api.VolumeLocator{Name: name}, nil).
Return([]*api.Volume{
&api.Volume{
Id: name,
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(size),
},
Expand All @@ -1008,6 +1056,7 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) {
Locator: &api.VolumeLocator{
Name: name,
},
Status: api.VolumeStatus_VOLUME_STATUS_UP,
Spec: &api.VolumeSpec{
Size: uint64(1234),
},
Expand Down
2 changes: 1 addition & 1 deletion csi/csi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func newTestServerWithConfig(t *testing.T, config *OsdCsiServerConfig) *testServ
func (s *testServer) setPorts() {
source := rand.NewSource(time.Now().UnixNano())
r := rand.New(source)
port := r.Intn(2999) + 8000
port := r.Intn(20000) + 10000

s.port = fmt.Sprintf("%d", port)
s.gwport = fmt.Sprintf("%d", port+1)
Expand Down
Loading