Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: read-repair CSI volume claims #7824

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,31 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return structs.ErrPermissionDenied
}

// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
// need to ensure it's been populated during upgrades from 0.11.0
// to later patch versions. Remove this block in 1.0
if args.Claim != structs.CSIVolumeClaimRelease && args.NodeID == "" {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
alloc, err := state.AllocByID(ws, args.AllocationID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("%s: %s",
structs.ErrUnknownAllocationPrefix, args.AllocationID)
}
args.NodeID = alloc.NodeID
}

if args.Claim != structs.CSIVolumeClaimRelease {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}

resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
Expand Down
33 changes: 21 additions & 12 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,22 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

index := uint64(1000)

state := srv.fsm.State()
codec := rpcClient(t, srv)
id0 := uuid.Generate()
alloc := mock.BatchAlloc()

// Create a client node and alloc
node := mock.Node()
alloc.NodeID = node.ID
summary := mock.JobSummary(alloc.JobID)
index++
require.NoError(t, state.UpsertJobSummary(index, summary))
index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc}))

// Create an initial volume claim request; we expect it to fail
// because there's no such volume yet.
claimReq := &structs.CSIVolumeClaimRequest{
Expand All @@ -222,16 +233,17 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
"expected 'volume not found' error because volume hasn't yet been created")

// Create a client node, plugin, alloc, and volume
node := mock.Node()
// Create a plugin and volume

node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
err = state.UpsertNode(1002, node)
index++
err = state.UpsertNode(index, node)
require.NoError(t, err)

vols := []*structs.CSIVolume{{
Expand All @@ -244,7 +256,8 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
Segments: map[string]string{"foo": "bar"},
}},
}}
err = state.CSIVolumeRegister(1003, vols)
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)

// Verify that the volume exists, and is healthy
Expand All @@ -263,12 +276,6 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
require.Len(t, volGetResp.Volume.ReadAllocs, 0)
require.Len(t, volGetResp.Volume.WriteAllocs, 0)

// Upsert the job and alloc
alloc.NodeID = node.ID
summary := mock.JobSummary(alloc.JobID)
require.NoError(t, state.UpsertJobSummary(1004, summary))
require.NoError(t, state.UpsertAllocs(1005, []*structs.Allocation{alloc}))

// Now our claim should succeed
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.NoError(t, err)
Expand All @@ -284,8 +291,10 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
alloc2 := mock.Alloc()
alloc2.JobID = uuid.Generate()
summary = mock.JobSummary(alloc2.JobID)
require.NoError(t, state.UpsertJobSummary(1005, summary))
require.NoError(t, state.UpsertAllocs(1006, []*structs.Allocation{alloc2}))
index++
require.NoError(t, state.UpsertJobSummary(index, summary))
index++
require.NoError(t, state.UpsertAllocs(index, []*structs.Allocation{alloc2}))
claimReq.AllocationID = alloc2.ID
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, "volume max claim reached",
Expand Down
19 changes: 19 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,17 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
}
if a != nil {
vol.ReadAllocs[id] = a
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. (same for WriteAlloc below)
if _, ok := vol.ReadClaims[id]; !ok {
vol.ReadClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateTaken,
}
}
}
}

Expand All @@ -2185,6 +2196,14 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol
}
if a != nil {
vol.WriteAllocs[id] = a
if _, ok := vol.WriteClaims[id]; !ok {
vol.WriteClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
}
}
}
}

Expand Down