Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: failed allocation should not block its own controller unpublish #14484

Merged
merged 4 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/14484.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:bug
csi: Fixed a bug where the server would not send controller unpublish for a failed allocation.
```

```release-note:bug
csi: Fixed a data race in the volume unpublish endpoint that could result in claims being incorrectly marked as freed before being persisted to raft.
```

```release-note:bug
api: Fixed a bug where the List Volume API did not include the `ControllerRequired` and `ResourceExhausted` fields.
```
2 changes: 2 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ type CSIVolumeListStub struct {
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
AttachmentMode CSIVolumeAttachmentMode
CurrentReaders int
CurrentWriters int
Schedulable bool
PluginID string
Provider string
Expand Down
39 changes: 32 additions & 7 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,12 +659,14 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}
vol = vol.Copy()
err = v.nodeUnpublishVolume(vol, claim)
if err != nil {
return err
}

NODE_DETACHED:
vol = vol.Copy()
Comment on lines +662 to +669
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding: are the two Copy calls required (instead of, for example, copying it once before the switch statement) because nodeUnpublishVolume will eventually call CSIVolumeDenormalize which will read the volume from the state store again?

Copy link
Member Author

@tgross tgross Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. That isn't guaranteed because nodeUnpublishVolume might return before that point if the node has been GC'd, so we can end up copying one extra time uselessly. That's unfortunate but doesn't feel like a big deal as it's a bit of a corner case. The other option would be to try to make it really precise about when we need to copy, but I think we've found that to be really error-prone. (And maybe something we could solve for in the state store itself at some point.)

err = v.controllerUnpublishVolume(vol, claim)
if err != nil {
return err
Expand All @@ -684,6 +686,10 @@ RELEASE_CLAIM:
return nil
}

// nodeUnpublishVolume handles the sending RPCs to the Node plugin to unmount
// it. Typically this task is already completed on the client, but we need to
// have this here so that GC can re-send it in case of client-side
// problems. This function should only be called on a copy of the volume.
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)

Expand Down Expand Up @@ -776,8 +782,12 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
return nil
}

// controllerUnpublishVolume handles the sending RPCs to the Controller plugin
// to unpublish the volume (detach it from its host). This function should only
// be called on a copy of the volume.
func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("controller unpublish", "vol", vol.ID)

if !vol.ControllerRequired {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
Expand All @@ -792,26 +802,39 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
} else if plugin == nil {
return fmt.Errorf("no such plugin: %q", vol.PluginID)
}

if !plugin.HasControllerCapability(structs.CSIControllerSupportsAttachDetach) {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
Comment on lines +807 to 808
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we skip the call to checkpointClaim does that mean that this claim state change is lost if a leadership transition happens?

Copy link
Member Author

@tgross tgross Sep 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment is really just for helping out testing. If we return nil from controllerUnpublishVolume the next step in the caller is to set claim.State = structs.CSIVolumeClaimStateReadyToFree and checkpoint.

(Same applies to the one below)

}

// we only send a controller detach if a Nomad client no longer has
// any claim to the volume, so we need to check the status of claimed
// allocations
vol, err = state.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}
for _, alloc := range vol.ReadAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {

// we only send a controller detach if a Nomad client no longer has any
// claim to the volume, so we need to check the status of any other claimed
// allocations
shouldCancel := func(alloc *structs.Allocation) bool {
if alloc != nil && alloc.ID != claim.AllocationID &&
alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, so just to understand this better, is the claim state updated here because it always needs to be set to CSIVolumeClaimStateReadyToFree before returning?

v.logger.Debug(
"controller unpublish canceled: another non-terminal alloc is on this node",
"vol", vol.ID, "alloc", alloc.ID)
return true
}
return false
}

for _, alloc := range vol.ReadAllocs {
if shouldCancel(alloc) {
return nil
}
}
for _, alloc := range vol.WriteAllocs {
if alloc != nil && alloc.NodeID == claim.NodeID && !alloc.TerminalStatus() {
claim.State = structs.CSIVolumeClaimStateReadyToFree
if shouldCancel(alloc) {
return nil
}
}
Expand All @@ -837,6 +860,8 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
if err != nil {
return fmt.Errorf("could not detach from controller: %v", err)
}

v.logger.Trace("controller detach complete", "vol", vol.ID)
claim.State = structs.CSIVolumeClaimStateReadyToFree
return v.checkpointClaim(vol, claim)
}
Expand Down
69 changes: 56 additions & 13 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client"
Expand All @@ -17,7 +22,6 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestCSIVolumeEndpoint_Get(t *testing.T) {
Expand Down Expand Up @@ -499,37 +503,52 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))

type tc struct {
name string
startingState structs.CSIVolumeClaimState
endState structs.CSIVolumeClaimState
nodeID string
otherNodeID string
expectedErrMsg string
}
testCases := []tc{
{
name: "success",
startingState: structs.CSIVolumeClaimStateControllerDetached,
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
{
name: "non-terminal allocation on same node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
nodeID: node.ID,
otherNodeID: node.ID,
},
{
name: "unpublish previously detached node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
{
name: "unpublish claim on garbage collected node",
startingState: structs.CSIVolumeClaimStateTaken,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: uuid.Generate(),
otherNodeID: uuid.Generate(),
},
{
name: "first unpublish",
startingState: structs.CSIVolumeClaimStateTaken,
endState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
nodeID: node.ID,
otherNodeID: uuid.Generate(),
},
}

Expand All @@ -551,15 +570,20 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {

index++
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
must.NoError(t, err)

// setup: create an alloc that will claim our volume
alloc := mock.BatchAlloc()
alloc.NodeID = tc.nodeID
alloc.ClientStatus = structs.AllocClientStatusFailed

otherAlloc := mock.BatchAlloc()
otherAlloc.NodeID = tc.otherNodeID
otherAlloc.ClientStatus = structs.AllocClientStatusRunning

index++
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc}))
must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc, otherAlloc}))

// setup: claim the volume for our alloc
claim := &structs.CSIVolumeClaim{
Expand All @@ -572,7 +596,20 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
index++
claim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, claim)
require.NoError(t, err)
must.NoError(t, err)

// setup: claim the volume for our other alloc
otherClaim := &structs.CSIVolumeClaim{
AllocationID: otherAlloc.ID,
NodeID: tc.otherNodeID,
ExternalNodeID: "i-example",
Mode: structs.CSIVolumeClaimRead,
}

index++
otherClaim.State = structs.CSIVolumeClaimStateTaken
err = state.CSIVolumeClaim(index, ns, volID, otherClaim)
must.NoError(t, err)

// test: unpublish and check the results
claim.State = tc.startingState
Expand All @@ -589,17 +626,23 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Unpublish", req,
&structs.CSIVolumeUnpublishResponse{})

vol, volErr := state.CSIVolumeByID(nil, ns, volID)
must.NoError(t, volErr)
must.NotNil(t, vol)

if tc.expectedErrMsg == "" {
require.NoError(t, err)
vol, err = state.CSIVolumeByID(nil, ns, volID)
require.NoError(t, err)
require.NotNil(t, vol)
require.Len(t, vol.ReadAllocs, 0)
must.NoError(t, err)
assert.Len(t, vol.ReadAllocs, 1)
} else {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
"error message %q did not contain %q", err.Error(), tc.expectedErrMsg)
must.Error(t, err)
assert.Len(t, vol.ReadAllocs, 2)
test.True(t, strings.Contains(err.Error(), tc.expectedErrMsg),
test.Sprintf("error %v did not contain %q", err, tc.expectedErrMsg))
claim = vol.PastClaims[alloc.ID]
must.NotNil(t, claim)
test.Eq(t, tc.endState, claim.State)
}

})
}

Expand Down
13 changes: 8 additions & 5 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,15 @@ type CSIVolListStub struct {
Schedulable bool
PluginID string
Provider string
ControllerRequired bool
ControllersHealthy int
ControllersExpected int
NodesHealthy int
NodesExpected int
CreateIndex uint64
ModifyIndex uint64
ResourceExhausted time.Time

CreateIndex uint64
ModifyIndex uint64
}

// NewCSIVolume creates the volume struct. No side-effects
Expand Down Expand Up @@ -409,7 +412,7 @@ func (v *CSIVolume) RemoteID() string {
}

func (v *CSIVolume) Stub() *CSIVolListStub {
stub := CSIVolListStub{
return &CSIVolListStub{
ID: v.ID,
Namespace: v.Namespace,
Name: v.Name,
Expand All @@ -422,15 +425,15 @@ func (v *CSIVolume) Stub() *CSIVolListStub {
Schedulable: v.Schedulable,
PluginID: v.PluginID,
Provider: v.Provider,
ControllerRequired: v.ControllerRequired,
ControllersHealthy: v.ControllersHealthy,
ControllersExpected: v.ControllersExpected,
NodesHealthy: v.NodesHealthy,
NodesExpected: v.NodesExpected,
ResourceExhausted: v.ResourceExhausted,
CreateIndex: v.CreateIndex,
ModifyIndex: v.ModifyIndex,
}

return &stub
}

// ReadSchedulable determines if the volume is potentially schedulable
Expand Down
2 changes: 2 additions & 0 deletions website/content/api-docs/volumes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ $ curl \
],
"AccessMode": "multi-node-single-writer",
"AttachmentMode": "file-system",
"CurrentReaders": 2,
"CurrentWriters": 1,
"Schedulable": true,
"PluginID": "plugin-id1",
"Provider": "ebs",
Expand Down