Skip to content

Commit

Permalink
CSI: retry claims from client when max claims are reached
Browse files Browse the repository at this point in the history
When the alloc runner claims a volume, an allocation for a previous
version of the job may still have the volume claimed because it's
still shutting down. In this case we'll receive an error from the
server. Retry this error until we succeed or until a very long timeout
expires, to give operators a chance to recover broken plugins.

Make the alloc runner hook tolerant of temporary RPC failures.
  • Loading branch information
tgross committed Feb 24, 2022
1 parent 6b6b827 commit 4fb2347
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 31 deletions.
7 changes: 7 additions & 0 deletions .changelog/12113.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail their first placement after a reschedule
```

```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail to restore after a client restart
```
80 changes: 76 additions & 4 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package allocrunner
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ type csiHook struct {
nodeSecret string

volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}
Expand All @@ -47,6 +49,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
Expand Down Expand Up @@ -213,11 +216,10 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
},
}

var resp structs.CSIVolumeClaimResponse
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
resp, err := c.claimWithRetry(req)
if err != nil {
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}

if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
Expand All @@ -230,6 +232,74 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil
}

// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()

var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}

err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}

if !isRetryableClaimRPCError(err) {
break
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
t.Reset(backoff)
}
return &resp, err
}

// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {

// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}

func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
Expand Down Expand Up @@ -286,7 +356,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
Expand All @@ -307,6 +377,8 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
t.Reset(backoff)
}
return nil
Expand Down
147 changes: 120 additions & 27 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -34,6 +36,9 @@ func TestCSIHook(t *testing.T) {
testcases := []struct {
name string
volumeRequests map[string]*structs.VolumeRequest
startsUnschedulable bool
startsWithClaims bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo
expectedMountCalls int
expectedUnmountCalls int
Expand Down Expand Up @@ -89,6 +94,58 @@ func TestCSIHook(t *testing.T) {
expectedUnpublishCalls: 1,
},

{
name: "fatal error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsUnschedulable: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 0,
expectedUnmountCalls: 0,
expectedClaimCalls: 1,
expectedUnpublishCalls: 0,
expectedClaimErr: errors.New(
"claim volumes: could not claim volume testvolume0: volume is currently unschedulable"),
},

{
name: "retryable error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithClaims: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 1,
expectedClaimCalls: 2,
expectedUnpublishCalls: 1,
},

// TODO: this won't actually work on the client.
// https://github.com/hashicorp/nomad/issues/11798
//
Expand Down Expand Up @@ -136,7 +193,12 @@ func TestCSIHook(t *testing.T) {

callCounts := map[string]int{}
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts}
rpcer := mockRPCer{
alloc: alloc,
callCounts: callCounts,
hasExistingClaim: helper.BoolToPtr(tc.startsWithClaims),
schedulable: helper.BoolToPtr(!tc.startsUnschedulable),
}
ar := mockAllocRunner{
res: &cstructs.AllocHookResources{},
caps: &drivers.Capabilities{
Expand All @@ -145,17 +207,24 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond

require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun())
}

require.NoError(t, hook.Postrun())
require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
Expand All @@ -168,25 +237,11 @@ func TestCSIHook(t *testing.T) {

// HELPERS AND MOCKS

func testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = true
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}
return vol
}

type mockRPCer struct {
alloc *structs.Allocation
callCounts map[string]int
alloc *structs.Allocation
callCounts map[string]int
hasExistingClaim *bool
schedulable *bool
}

// RPC mocks the server RPCs, acting as though any request succeeds
Expand All @@ -195,7 +250,7 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
case "CSIVolume.Claim":
r.callCounts["claim"]++
req := args.(*structs.CSIVolumeClaimRequest)
vol := testVolume(req.VolumeID)
vol := r.testVolume(req.VolumeID)
err := vol.Claim(req.ToClaim(), r.alloc)
if err != nil {
return err
Expand All @@ -215,6 +270,44 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
return nil
}

// testVolume is a helper that optionally starts as unschedulable /
// claimed until after the first claim RPC is made, so that we can
// test retryable vs non-retryable failures
func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = *r.schedulable
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}

if *r.hasExistingClaim {
vol.AccessMode = structs.CSIVolumeAccessModeSingleNodeReader
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol.ReadClaims["another-alloc-id"] = &structs.CSIVolumeClaim{
AllocationID: "another-alloc-id",
NodeID: "another-node-id",
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
}
}

if r.callCounts["claim"] >= 0 {
*r.hasExistingClaim = false
*r.schedulable = true
}

return vol
}

type mockVolumeMounter struct {
callCounts map[string]int
}
Expand Down

0 comments on commit 4fb2347

Please sign in to comment.