From 24f1365c6a0b517187c9a8341fa15b93a6997e73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20Pab=C3=B3n?= Date: Fri, 16 Aug 2019 15:08:18 -0700 Subject: [PATCH] SDK will now return a volume only if it is ready MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this fix, any call creating a volume and noticing it already exists now checks to make sure that the volume is UP and ready before returning. Closes #1180 Signed-off-by: Luis Pabón --- api/server/sdk/sdk_test.go | 2 +- api/server/sdk/volume_ops.go | 79 +++++++++++++++- api/server/sdk/volume_ops_test.go | 150 +++++++++++++++++++++++++++--- api/server/testutils_test.go | 2 +- csi/controller_test.go | 49 ++++++++++ csi/csi_test.go | 2 +- pkg/util/wait.go | 50 +++++++++- 7 files changed, 314 insertions(+), 20 deletions(-) diff --git a/api/server/sdk/sdk_test.go b/api/server/sdk/sdk_test.go index 75e7164c9..34f1fc67a 100644 --- a/api/server/sdk/sdk_test.go +++ b/api/server/sdk/sdk_test.go @@ -147,7 +147,7 @@ func newTestServer(t *testing.T) *testServer { func (s *testServer) setPorts() { source := rand.NewSource(time.Now().UnixNano()) r := rand.New(source) - port := r.Intn(2999) + 8000 + port := r.Intn(20000) + 10000 s.port = fmt.Sprintf("%d", port) s.gwport = fmt.Sprintf("%d", port+1) diff --git a/api/server/sdk/volume_ops.go b/api/server/sdk/volume_ops.go index abe12c597..c1b9ce57c 100644 --- a/api/server/sdk/volume_ops.go +++ b/api/server/sdk/volume_ops.go @@ -19,6 +19,7 @@ package sdk import ( "context" "fmt" + "time" "github.com/sirupsen/logrus" @@ -32,6 +33,68 @@ import ( "google.golang.org/grpc/status" ) +// When create is called for an existing volume, this function is called to make sure +// the SDK only returns that the volume is ready when the status is UP +func (s *VolumeServer) waitForVolumeReady(ctx context.Context, id string) (*api.Volume, error) { + var v *api.Volume + + minTimeout := 1 * time.Second + maxTimeout := 60 * time.Minute + defaultTimeout := 10 * time.Minute + + logrus.Infof("Waiting for volume %s to become available", id) + + e := util.WaitForWithContext( + ctx, + minTimeout, maxTimeout, defaultTimeout, // timeouts + 250*time.Millisecond, // period + func() (bool, error) { + var err error + // Get the latest status from the volume + v, err = util.VolumeFromName(s.driver(ctx), id) + if err != nil { + return false, status.Errorf(codes.Internal, err.Error()) + } + + // Check if the volume is ready + if v.GetStatus() == api.VolumeStatus_VOLUME_STATUS_UP { + return false, nil + } + + // Continue waiting + return true, nil + }) + + return v, e +} + +func (s *VolumeServer) waitForVolumeRemoved(ctx context.Context, id string) error { + var v *api.Volume + + minTimeout := 1 * time.Second + maxTimeout := 10 * time.Minute + defaultTimeout := 5 * time.Minute + + logrus.Infof("Waiting for volume %s to be removed", id) + + return util.WaitForWithContext( + ctx, + minTimeout, maxTimeout, defaultTimeout, // timeouts + 250*time.Millisecond, // period + func() (bool, error) { + var err error + // Get the latest status from the volume + v, err = util.VolumeFromName(s.driver(ctx), id) + if err != nil { + // Removed + return false, nil + } + + // Continue waiting + return true, nil + }) +} + func (s *VolumeServer) create( ctx context.Context, locator *api.VolumeLocator, @@ -42,12 +105,26 @@ func (s *VolumeServer) create( // Check if the volume has already been created or is in process of creation volName := locator.GetName() v, err := util.VolumeFromName(s.driver(ctx), volName) - if err == nil { + + // If the volume is still there but it is being delete, then wait until it is removed + if err == nil && v.GetState() == api.VolumeState_VOLUME_STATE_DELETED { + if err = s.waitForVolumeRemoved(ctx, volName); err != nil { + return "", err + } + + // If the volume is there but it is not being deleted then just return the current id + } else if err == nil { // Check ownership if !v.IsPermitted(ctx, api.Ownership_Admin) { return "", status.Errorf(codes.PermissionDenied, "Volume %s already exists and is owned by another user", volName) } + // Wait until ready + v, err = s.waitForVolumeReady(ctx, volName) + if err != nil { + return "", err + } + // Check the requested arguments match that of the existing volume if v.GetSpec().GetSize() != spec.GetSize() { return "", status.Errorf( diff --git a/api/server/sdk/volume_ops_test.go b/api/server/sdk/volume_ops_test.go index 044a96b4a..f310c9ffb 100644 --- a/api/server/sdk/volume_ops_test.go +++ b/api/server/sdk/volume_ops_test.go @@ -44,7 +44,7 @@ func init() { policy.Init(kv) } -func TestSdkVolumeCreateCheckIdempotency(t *testing.T) { +func TestSdkVolumeCreateCheckIdempotencyWaitForRemoved(t *testing.T) { // Create server and client connection s := newTestServer(t) @@ -59,24 +59,51 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) { }, } - // Create response id := "myid" + vol := &api.Volume{ + Id: id, + Status: api.VolumeStatus_VOLUME_STATUS_UP, + State: api.VolumeState_VOLUME_STATE_DELETED, + Locator: &api.VolumeLocator{ + Name: name, + }, + Spec: &api.VolumeSpec{ + Size: size, + }, + } + gomock.InOrder( s.MockDriver(). EXPECT(). Inspect([]string{name}). - Return([]*api.Volume{ - &api.Volume{ - Id: id, - Locator: &api.VolumeLocator{ - Name: name, - }, - Spec: &api.VolumeSpec{ - Size: size, - }, - }, - }, nil). - Times(1), + Return([]*api.Volume{vol}, nil), + + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Return([]*api.Volume{vol}, nil), + + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Return([]*api.Volume{vol}, nil), + + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Return(nil, fmt.Errorf("MOCK ERROR")), + + s.MockDriver(). + EXPECT(). + Enumerate(&api.VolumeLocator{Name: name}, nil). + Return(nil, fmt.Errorf("not found")), + + s.MockDriver(). + EXPECT(). + Create(&api.VolumeLocator{ + Name: name, + }, &api.Source{}, &api.VolumeSpec{Size: size}). + Return(id, nil), ) // Setup client @@ -88,6 +115,101 @@ func TestSdkVolumeCreateCheckIdempotency(t *testing.T) { assert.Equal(t, r.GetVolumeId(), "myid") } +func TestSdkVolumeCreateCheckIdempotencyWaitForReady(t *testing.T) { + + // Create server and client connection + s := newTestServer(t) + defer s.Stop() + + name := "myvol" + size := uint64(1234) + req := &api.SdkVolumeCreateRequest{ + Name: name, + Spec: &api.VolumeSpec{ + Size: size, + }, + } + + id := "myid" + count := 0 + vol := &api.Volume{ + Id: id, + Status: api.VolumeStatus_VOLUME_STATUS_DEGRADED, + Locator: &api.VolumeLocator{ + Name: name, + }, + Spec: &api.VolumeSpec{ + Size: size, + }, + } + + // This should be called 3 times. + // 1 for the first inspect to check if it is there. + // 3 for waiting until the status is VOLUME UP + // 1 for waiting but getting that the volume is up + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Do(func([]string) { + count++ + if count == 4 { + vol.Status = api.VolumeStatus_VOLUME_STATUS_UP + } + }). + Return([]*api.Volume{vol}, nil). + Times(4) + + // Setup client + c := api.NewOpenStorageVolumeClient(s.Conn()) + + // Get info + _, err := c.Create(context.Background(), req) + assert.NoError(t, err) +} + +func TestSdkVolumeCreateCheckIdempotency(t *testing.T) { + + // Create server and client connection + s := newTestServer(t) + defer s.Stop() + + name := "myvol" + size := uint64(1234) + req := &api.SdkVolumeCreateRequest{ + Name: name, + Spec: &api.VolumeSpec{ + Size: size, + }, + } + + // Create response + id := "myid" + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Return([]*api.Volume{ + &api.Volume{ + Id: id, + Status: api.VolumeStatus_VOLUME_STATUS_UP, + Locator: &api.VolumeLocator{ + Name: name, + }, + Spec: &api.VolumeSpec{ + Size: size, + }, + }, + }, nil). + Times(2) + + // Setup client + c := api.NewOpenStorageVolumeClient(s.Conn()) + + // Get info + r, err := c.Create(context.Background(), req) + assert.NoError(t, err) + assert.Equal(t, r.GetVolumeId(), "myid") +} + func TestSdkVolumeCreate(t *testing.T) { // Create server and client connection diff --git a/api/server/testutils_test.go b/api/server/testutils_test.go index 4f03a7671..77c0595e5 100644 --- a/api/server/testutils_test.go +++ b/api/server/testutils_test.go @@ -291,7 +291,7 @@ func newTestServer(t *testing.T) *testServer { func (s *testServer) setPorts() { source := rand.NewSource(time.Now().UnixNano()) r := rand.New(source) - port := r.Intn(2999) + 8000 + port := r.Intn(20000) + 10000 s.port = fmt.Sprintf("%d", port) s.gwport = fmt.Sprintf("%d", port+1) diff --git a/csi/controller_test.go b/csi/controller_test.go index 3385cf216..de59d180d 100644 --- a/csi/controller_test.go +++ b/csi/controller_test.go @@ -859,6 +859,30 @@ func TestControllerCreateVolumeFoundByVolumeFromNameConflict(t *testing.T) { Locator: &api.VolumeLocator{ Name: "size", }, + Status: api.VolumeStatus_VOLUME_STATUS_UP, + Spec: &api.VolumeSpec{ + + // Size is different + Size: 10, + }, + }}, nil). + Times(1), + + s.MockDriver(). + EXPECT(). + Inspect([]string{"size"}). + Return(nil, fmt.Errorf("not found")). + Times(1), + + s.MockDriver(). + EXPECT(). + Enumerate(&api.VolumeLocator{Name: "size"}, nil). + Return([]*api.Volume{&api.Volume{ + Id: "size", + Locator: &api.VolumeLocator{ + Name: "size", + }, + Status: api.VolumeStatus_VOLUME_STATUS_UP, Spec: &api.VolumeSpec{ // Size is different @@ -991,6 +1015,30 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) { Locator: &api.VolumeLocator{ Name: name, }, + Status: api.VolumeStatus_VOLUME_STATUS_UP, + Spec: &api.VolumeSpec{ + Size: uint64(size), + }, + }, + }, nil). + Times(1), + + s.MockDriver(). + EXPECT(). + Inspect([]string{name}). + Return(nil, fmt.Errorf("not found")). + Times(1), + + s.MockDriver(). + EXPECT(). + Enumerate(&api.VolumeLocator{Name: name}, nil). + Return([]*api.Volume{ + &api.Volume{ + Id: name, + Locator: &api.VolumeLocator{ + Name: name, + }, + Status: api.VolumeStatus_VOLUME_STATUS_UP, Spec: &api.VolumeSpec{ Size: uint64(size), }, @@ -1008,6 +1056,7 @@ func TestControllerCreateVolumeFoundByVolumeFromName(t *testing.T) { Locator: &api.VolumeLocator{ Name: name, }, + Status: api.VolumeStatus_VOLUME_STATUS_UP, Spec: &api.VolumeSpec{ Size: uint64(1234), }, diff --git a/csi/csi_test.go b/csi/csi_test.go index 3bf0869ab..837072f66 100644 --- a/csi/csi_test.go +++ b/csi/csi_test.go @@ -232,7 +232,7 @@ func newTestServerWithConfig(t *testing.T, config *OsdCsiServerConfig) *testServ func (s *testServer) setPorts() { source := rand.NewSource(time.Now().UnixNano()) r := rand.New(source) - port := r.Intn(2999) + 8000 + port := r.Intn(20000) + 10000 s.port = fmt.Sprintf("%d", port) s.gwport = fmt.Sprintf("%d", port+1) diff --git a/pkg/util/wait.go b/pkg/util/wait.go index 81826a95f..b275d6fbf 100644 --- a/pkg/util/wait.go +++ b/pkg/util/wait.go @@ -1,10 +1,56 @@ +/* +Package sdk is the gRPC implementation of the SDK gRPC server +Copyright 2018 Portworx + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package util import ( - "fmt" + "context" "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// WaitForWithContext waits for a function to complete work with a default timeout or +// a deadline from the context if provided +func WaitForWithContext( + ctx context.Context, + minTimeout, maxTimeout, defaultTimeout time.Duration, + period time.Duration, + f func() (bool, error), +) error { + var timeout time.Duration + + // Check if the caller provided a deadline + d, ok := ctx.Deadline() + if !ok { + timeout = defaultTimeout + } else { + timeout = d.Sub(time.Now()) + + // Determine if it is too short or too long + if timeout < minTimeout || timeout > minTimeout { + return status.Errorf(codes.InvalidArgument, + "Deadline must be between %v and %v; was: %v", minTimeout, maxTimeout, timeout) + } + } + + return WaitFor(timeout, period, f) +} + // WaitFor() waits until f() returns false or err != nil // f() returns . func WaitFor(timeout time.Duration, period time.Duration, f func() (bool, error)) error { @@ -16,7 +62,7 @@ func WaitFor(timeout time.Duration, period time.Duration, f func() (bool, error) for wait { select { case <-timeoutChan: - return fmt.Errorf("Timed out") + return status.Errorf(codes.DeadlineExceeded, "Timed out") default: wait, err = f() if err != nil {