Skip to content

Commit

Permalink
csi: read-repair CSI volume claims (#7824)
Browse files Browse the repository at this point in the history
The `CSIVolumeClaim` fields were added after 0.11.1, so claims made
before that may be missing the value. Repair this when we read the
volume out of the state store.

The `NodeID` field was added after 0.11.0, so we need to ensure it's
been populated during upgrades from 0.11.0.
  • Loading branch information
tgross authored Apr 29, 2020
1 parent d913f05 commit 36e3d13
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 15 deletions.
22 changes: 19 additions & 3 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,31 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return structs.ErrPermissionDenied
}

// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
// need to ensure it's been populated during upgrades from 0.11.0
// to later patch versions. Remove this block in 1.0
if args.Claim != structs.CSIVolumeClaimRelease && args.NodeID == "" {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
alloc, err := state.AllocByID(ws, args.AllocationID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("%s: %s",
structs.ErrUnknownAllocationPrefix, args.AllocationID)
}
args.NodeID = alloc.NodeID
}

if args.Claim != structs.CSIVolumeClaimRelease {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
Expand Down
33 changes: 21 additions & 12 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,22 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

index := uint64(1000)

state := srv.fsm.State()
codec := rpcClient(t, srv)
id0 := uuid.Generate()
alloc := mock.BatchAlloc()

// Create a client node and alloc
node := mock.Node()
alloc.NodeID = node.ID
summary := mock.JobSummary(alloc.JobID)
index++
require.NoError(t, state.UpsertJobSummary(index, summary))
index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc}))

// Create an initial volume claim request; we expect it to fail
// because there's no such volume yet.
claimReq := &structs.CSIVolumeClaimRequest{
Expand All @@ -222,16 +233,17 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
"expected 'volume not found' error because volume hasn't yet been created")

// Create a client node, plugin, alloc, and volume
node := mock.Node()
// Create a plugin and volume

node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
err = state.UpsertNode(1002, node)
index++
err = state.UpsertNode(index, node)
require.NoError(t, err)

vols := []*structs.CSIVolume{{
Expand All @@ -244,7 +256,8 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
Segments: map[string]string{"foo": "bar"},
}},
}}
err = state.CSIVolumeRegister(1003, vols)
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)

// Verify that the volume exists, and is healthy
Expand All @@ -263,12 +276,6 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
require.Len(t, volGetResp.Volume.ReadAllocs, 0)
require.Len(t, volGetResp.Volume.WriteAllocs, 0)

// Upsert the job and alloc
alloc.NodeID = node.ID
summary := mock.JobSummary(alloc.JobID)
require.NoError(t, state.UpsertJobSummary(1004, summary))
require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc}))

// Now our claim should succeed
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.NoError(t, err)
Expand All @@ -284,8 +291,10 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
alloc2 := mock.Alloc()
alloc2.JobID = uuid.Generate()
summary = mock.JobSummary(alloc2.JobID)
require.NoError(t, state.UpsertJobSummary(1005, summary))
require.NoError(t, state.UpsertAllocs(1006, []*structs.Allocation{alloc2}))
index++
require.NoError(t, state.UpsertJobSummary(index, summary))
index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc2}))
claimReq.AllocationID = alloc2.ID
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, "volume max claim reached",
Expand Down
19 changes: 19 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,17 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
}
if a != nil {
vol.ReadAllocs[id] = a
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. (same for WriteAlloc below)
if _, ok := vol.ReadClaims[id]; !ok {
vol.ReadClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateTaken,
}
}
}
}

Expand All @@ -2185,6 +2196,14 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
}
if a != nil {
vol.WriteAllocs[id] = a
if _, ok := vol.WriteClaims[id]; !ok {
vol.WriteClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
}
}
}
}

Expand Down

0 comments on commit 36e3d13

Please sign in to comment.