Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: don't pass volume claim releases thru GC eval #8021

Merged
merged 3 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions nomad/csi_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package nomad

import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)

// csiBatchRelease is a helper for any time we need to release a bunch
// of volume claims at once. It de-duplicates the volumes and batches
// the raft messages into manageable chunks. Intended for use by RPCs
// that have already been forwarded to the leader.
type csiBatchRelease struct {
srv *Server
logger log.Logger

maxBatchSize int
seen map[string]struct{}
batches []*structs.CSIVolumeClaimBatchRequest
}

func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
return &csiBatchRelease{
srv: srv,
logger: logger,
maxBatchSize: max,
seen: map[string]struct{}{},
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
}
}

// add the volume ID + namespace to the deduplicated batches
func (c *csiBatchRelease) add(vol, namespace string) {
id := vol + namespace
tgross marked this conversation as resolved.
Show resolved Hide resolved

// ignore duplicates
_, seen := c.seen[id]
if seen {
return
}

req := structs.CSIVolumeClaimRequest{
VolumeID: vol,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = namespace
req.Region = c.srv.config.Region

for _, batch := range c.batches {
// otherwise append to the first non-full batch
if len(batch.Claims) < c.maxBatchSize {
batch.Claims = append(batch.Claims, req)
return
}
}
// no non-full batch found, make a new one
newBatch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{req}}
c.batches = append(c.batches, newBatch)
}

// apply flushes the batches to raft
func (c *csiBatchRelease) apply() error {
for _, batch := range c.batches {
if len(batch.Claims) > 0 {
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
if err != nil {
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
tgross marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return nil
}
98 changes: 38 additions & 60 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,20 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}

// For a job with volumes, find its volumes before deleting the job
volumesToGC := make(map[string]*structs.VolumeRequest)
// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source] = vol
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}

// Commit this update via Raft
// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.logger.Error("deregister failed", "error", err)
Expand All @@ -729,69 +730,46 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Populate the reply with job information
reply.JobModifyIndex = index

evals := []*structs.Evaluation{}
now := time.Now().UTC().UnixNano()
// Make a raft apply to release the CSI volume claims of terminal allocs.
volumesToGC.apply()
tgross marked this conversation as resolved.
Show resolved Hide resolved

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, vol := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source,
LeaderACL: j.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

// If the job is periodic or parameterized, we don't create an eval
// for the job, but might still need one for the volumes
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// Create a new evaluation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for reviewers: from here down the changes in this file are restoring the code that existed pre-0.11.0

// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

if len(evals) > 0 {
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}

// Populate the reply with eval information
reply.EvalID = evals[len(evals)-1].ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}

// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}

Expand Down
33 changes: 8 additions & 25 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need volume claim GC.
// Later we'll create a gc eval for each volume.
volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace]
// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
Expand Down Expand Up @@ -1113,11 +1113,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add its job to the list
// of jobs we're going to call volume claim GC on.
// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace}
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

Expand All @@ -1138,25 +1138,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, volAndNamespace := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: volAndNamespace[1],
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0],
LeaderACL: n.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
// Make a raft apply to release the CSI volume claims of terminal allocs.
volumesToGC.apply()

// Add this to the batch
n.updatesLock.Lock()
Expand Down
65 changes: 42 additions & 23 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {

codec := rpcClient(t, srv)
state := srv.fsm.State()

index := uint64(0)
ws := memdb.NewWatchSet()

// Create a client node, plugin, and volume
Expand All @@ -2333,7 +2335,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
ControllerInfo: &structs.CSIControllerInfo{},
},
}
err := state.UpsertNode(99, node)
index++
err := state.UpsertNode(index, node)
require.NoError(t, err)
volId0 := uuid.Generate()
ns := structs.DefaultNamespace
Expand All @@ -2344,7 +2347,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
err = state.CSIVolumeRegister(100, vols)
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
Expand All @@ -2361,39 +2365,51 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
ReadOnly: false,
},
}
err = state.UpsertJob(101, job)
index++
err = state.UpsertJob(index, job)
require.NoError(t, err)

alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID))
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID))
index++
err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name

err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2})
index++
err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Mode: structs.CSIVolumeClaimWrite,
})
require.NoError(t, err)
err = state.CSIVolumeClaim(106, ns, volId0, &structs.CSIVolumeClaim{
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Mode: structs.CSIVolumeClaimRead,
})
// Claim the volumes and verify the claims were set. We need to
// apply this through the FSM so that we make sure the index is
// properly updated to test later
batch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{
{
VolumeID: volId0,
AllocationID: alloc1.ID,
NodeID: alloc1.NodeID,
Claim: structs.CSIVolumeClaimWrite,
},
{
VolumeID: volId0,
AllocationID: alloc2.ID,
NodeID: alloc2.NodeID,
Claim: structs.CSIVolumeClaimRead,
},
}}
_, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
require.NoError(t, err)

vol, err = state.CSIVolumeByID(ws, ns, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
Expand All @@ -2413,11 +2429,14 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus)

// Verify the eval for the claim GC was emitted
// Lookup the evaluations
eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0)
require.NotNil(t, eval)
require.Nil(t, err)
// Verify the index has been updated to trigger a volume claim release

req := &structs.CSIVolumeGetRequest{ID: volId0}
req.Region = "global"
getResp := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp)
require.NoError(t, err)
require.Greater(t, getResp.Volume.ModifyIndex, lastIndex)
}

func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
Expand Down