Skip to content

Commit

Permalink
Backport of CSI: failed allocation should not block its own controlle…
Browse files Browse the repository at this point in the history
…r unpublish into release/1.3.x (#14508)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core authored Sep 8, 2022
1 parent 5f2f6f5 commit a7ce11c
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
11 changes: 11 additions & 0 deletions .changelog/14484.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
csi: Fixed a bug where the server would not send controller unpublish for a failed allocation.
```

```release-note:bug
csi: Fixed a data race in the volume unpublish endpoint that could result in claims being incorrectly marked as freed before being persisted to raft.
```

```release-note:bug
api: Fixed a bug where the List Volume API did not include the `ControllerRequired` and `ResourceExhausted` fields.
```
2 changes: 2 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ type CSIVolumeListStub struct {
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
AttachmentMode CSIVolumeAttachmentMode
CurrentReaders int
CurrentWriters int
Schedulable bool
PluginID string
Provider string
Expand Down
39 changes: 32 additions & 7 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,12 +659,14 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}
vol = vol.Copy()
err = v.nodeUnpublishVolume(vol, claim)
if err != nil {
return err
}

NODE_DETACHED:
vol = vol.Copy()
err = v.controllerUnpublishVolume(vol, claim)
if err != nil {
return err
Expand All @@ -684,6 +686,10 @@ RELEASE_CLAIM:
return nil
}

// nodeUnpublishVolume handles the sending RPCs to the Node plugin to unmount
// it. Typically this task is already completed on the client, but we need to
// have this here so that GC can re-send it in case of client-side
// problems. This function should only be called on a copy of the volume.
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)

Expand Down Expand Up @@ -776,8 +782,12 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
return nil
}

// controllerUnpublishVolume handles the sending RPCs to the Controller plugin
// to unpublish the volume (detach it from its host). This function should only
// be called on a copy of the volume.
func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("controller unpublish", "vol", vol.ID)

if !vol.ControllerRequired {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
Expand All @@ -792,26 +802,39 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
} else if plugin == nil {
return fmt.Errorf("no such plugin: %q", vol.PluginID)
}

if !plugin.HasControllerCapability(structs.CSIControllerSupportsAttachDetach) {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
}

// we only send a controller detach if a Nomad client no longer has
// any claim to the volume, so we need to check the status of claimed
// allocations
vol, err = state.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
for _, alloc := range vol.ReadAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {

// we only send a controller detach if a Nomad client no longer has any
// claim to the volume, so we need to check the status of any other claimed
// allocations
shouldCancel := func(alloc *structs.Allocation) bool {
if alloc != nil && alloc.ID != claim.AllocationID &&
alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
v.logger.Debug(
"controller unpublish canceled: another non-terminal alloc is on this node",
"vol", vol.ID, "alloc", alloc.ID)
return true
}
return false
}

for _, alloc := range vol.ReadAllocs {
if shouldCancel(alloc) {
return nil
}
}
for _, alloc := range vol.WriteAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
if shouldCancel(alloc) {
return nil
}
}
Expand All @@ -837,6 +860,8 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
if err != nil {
return fmt.Errorf("could not detach from controller: %v", err)
}

v.logger.Trace("controller detach complete", "vol", vol.ID)
claim.State = structs.CSIVolumeClaimStateReadyToFree
return v.checkpointClaim(vol, claim)
}
Expand Down
69 changes: 56 additions & 13 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client"
Expand All @@ -17,7 +22,6 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestCSIVolumeEndpoint_Get(t *testing.T) {
Expand Down Expand Up @@ -499,37 +503,52 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))

type tc struct {
name string
startingState structs.CSIVolumeClaimState
endState structs.CSIVolumeClaimState
nodeID string
otherNodeID string
expectedErrMsg string
}
testCases := []tc{
{
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
{
name: "non-terminal allocation on same node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
nodeID: node.ID,
otherNodeID: node.ID,
},
{
name: "unpublish previously detached node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
{
name: "unpublish claim on garbage collected node",
startingState: structs.CSIVolumeClaimStateTaken,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: uuid.Generate(),
otherNodeID: uuid.Generate(),
},
{
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
}

Expand All @@ -551,15 +570,20 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {

index++
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
must.NoError(t, err)

// setup: create an alloc that will claim our volume
alloc := mock.BatchAlloc()
alloc.NodeID = tc.nodeID
alloc.ClientStatus = structs.AllocClientStatusFailed

otherAlloc := mock.BatchAlloc()
otherAlloc.NodeID = tc.otherNodeID
otherAlloc.ClientStatus = structs.AllocClientStatusRunning

index++
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc}))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc, otherAlloc}))

// setup: claim the volume for our alloc
claim := &structs.CSIVolumeClaim{
Expand All @@ -572,7 +596,20 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
index++
claim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, claim)
require.NoError(t, err)
must.NoError(t, err)

// setup: claim the volume for our other alloc
otherClaim := &structs.CSIVolumeClaim{
AllocationID: otherAlloc.ID,
NodeID: tc.otherNodeID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
}

index++
otherClaim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, otherClaim)
must.NoError(t, err)

// test: unpublish and check the results
claim.State = tc.startingState
Expand All @@ -589,17 +626,23 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req,
&structs.CSIVolumeUnpublishResponse{})

vol, volErr := state.CSIVolumeByID(nil, ns, volID)
must.NoError(t, volErr)
must.NotNil(t, vol)

if tc.expectedErrMsg == "" {
require.NoError(t, err)
vol, err = state.CSIVolumeByID(nil, ns, volID)
require.NoError(t, err)
require.NotNil(t, vol)
require.Len(t, vol.ReadAllocs, 0)
must.NoError(t, err)
assert.Len(t, vol.ReadAllocs, 1)
} else {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
"error message %q did not contain %q", err.Error(), tc.expectedErrMsg)
must.Error(t, err)
assert.Len(t, vol.ReadAllocs, 2)
test.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
test.Sprintf("error %v did not contain %q", err, tc.expectedErrMsg))
claim = vol.PastClaims[alloc.ID]
must.NotNil(t, claim)
test.Eq(t, tc.endState, claim.State)
}

})
}

Expand Down
13 changes: 8 additions & 5 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,15 @@ type CSIVolListStub struct {
Schedulable bool
PluginID string
Provider string
ControllerRequired bool
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
CreateIndex uint64
ModifyIndex uint64
ResourceExhausted time.Time

CreateIndex uint64
ModifyIndex uint64
}

// NewCSIVolume creates the volume struct. No side-effects
Expand Down Expand Up @@ -409,7 +412,7 @@ func (v *CSIVolume) RemoteID() string {
}

func (v *CSIVolume) Stub() *CSIVolListStub {
stub := CSIVolListStub{
return &CSIVolListStub{
ID: v.ID,
Namespace: v.Namespace,
Name: v.Name,
Expand All @@ -422,15 +425,15 @@ func (v *CSIVolume) Stub() *CSIVolListStub {
Schedulable: v.Schedulable,
PluginID: v.PluginID,
Provider: v.Provider,
ControllerRequired: v.ControllerRequired,
ControllersHealthy: v.ControllersHealthy,
ControllersExpected: v.ControllersExpected,
NodesHealthy: v.NodesHealthy,
NodesExpected: v.NodesExpected,
ResourceExhausted: v.ResourceExhausted,
CreateIndex: v.CreateIndex,
ModifyIndex: v.ModifyIndex,
}

return &stub
}

// ReadSchedulable determines if the volume is potentially schedulable
Expand Down
2 changes: 2 additions & 0 deletions website/content/api-docs/volumes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ $ curl \
],
"AccessMode": "multi-node-single-writer",
"AttachmentMode": "file-system",
"CurrentReaders": 2,
"CurrentWriters": 1,
"Schedulable": true,
"PluginID": "plugin-id1",
"Provider": "ebs",
Expand Down

0 comments on commit a7ce11c

Please sign in to comment.