diff --git a/nomad/csi_batch.go b/nomad/csi_batch.go new file mode 100644 index 00000000000..50e36e7a6c4 --- /dev/null +++ b/nomad/csi_batch.go @@ -0,0 +1,76 @@ +package nomad + +import ( + log "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/structs" +) + +// csiBatchRelease is a helper for any time we need to release a bunch +// of volume claims at once. It de-duplicates the volumes and batches +// the raft messages into manageable chunks. Intended for use by RPCs +// that have already been forwarded to the leader. +type csiBatchRelease struct { + srv *Server + logger log.Logger + + maxBatchSize int + seen map[string]struct{} + batches []*structs.CSIVolumeClaimBatchRequest +} + +func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease { + return &csiBatchRelease{ + srv: srv, + logger: logger, + maxBatchSize: max, + seen: map[string]struct{}{}, + batches: []*structs.CSIVolumeClaimBatchRequest{{}}, + } +} + +// add the volume ID + namespace to the deduplicated batches +func (c *csiBatchRelease) add(vol, namespace string) { + id := vol + "\x00" + namespace + + // ignore duplicates + _, seen := c.seen[id] + if seen { + return + } + c.seen[id] = struct{}{} + + req := structs.CSIVolumeClaimRequest{ + VolumeID: vol, + Claim: structs.CSIVolumeClaimRelease, + } + req.Namespace = namespace + req.Region = c.srv.config.Region + + for _, batch := range c.batches { + // otherwise append to the first non-full batch + if len(batch.Claims) < c.maxBatchSize { + batch.Claims = append(batch.Claims, req) + return + } + } + // no non-full batch found, make a new one + newBatch := &structs.CSIVolumeClaimBatchRequest{ + Claims: []structs.CSIVolumeClaimRequest{req}} + c.batches = append(c.batches, newBatch) +} + +// apply flushes the batches to raft +func (c *csiBatchRelease) apply() error { + var result *multierror.Error + for _, batch := range c.batches { + if len(batch.Claims) > 0 { + _, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) + if err != nil { + c.logger.Error("csi raft apply failed", "error", err, "method", "claim") + result = multierror.Append(result, err) + } + } + } + return result.ErrorOrNil() +} diff --git a/nomad/csi_batch_test.go b/nomad/csi_batch_test.go new file mode 100644 index 00000000000..565fef591e3 --- /dev/null +++ b/nomad/csi_batch_test.go @@ -0,0 +1,34 @@ +package nomad + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCSI_Batcher(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer shutdown() + + batcher := newCSIBatchRelease(srv, nil, 5) + + batcher.add("vol0", "global") + batcher.add("vol", "0global") + batcher.add("vol1", "global") + batcher.add("vol1", "global") + batcher.add("vol2", "global") + batcher.add("vol2", "other") + batcher.add("vol3", "global") + batcher.add("vol4", "global") + batcher.add("vol5", "global") + batcher.add("vol6", "global") + + require.Len(t, batcher.batches, 2) + require.Len(t, batcher.batches[0].Claims, 5, "first batch") + require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2") + require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other") + require.Len(t, batcher.batches[1].Claims, 4, "second batch") +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 536a051ee05..ce18222b69c 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -707,19 +707,20 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return err } - // For a job with volumes, find its volumes before deleting the job - volumesToGC := make(map[string]*structs.VolumeRequest) + // For a job with volumes, find its volumes before deleting the job. + // Later we'll apply this raft. + volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100) if job != nil { for _, tg := range job.TaskGroups { for _, vol := range tg.Volumes { if vol.Type == structs.VolumeTypeCSI { - volumesToGC[vol.Source] = vol + volumesToGC.add(vol.Source, job.Namespace) } } } } - // Commit this update via Raft + // Commit the job update via Raft _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) if err != nil { j.logger.Error("deregister failed", "error", err) @@ -729,70 +730,52 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index - evals := []*structs.Evaluation{} - now := time.Now().UTC().UnixNano() - - // Add an evaluation for garbage collecting the the CSI volume claims - // of terminal allocs - for _, vol := range volumesToGC { - // we have to build this eval by hand rather than calling srv.CoreJob - // here because we need to use the volume's namespace - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: job.Namespace, - Priority: structs.CoreJobPriority, - Type: structs.JobTypeCore, - TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source, - LeaderACL: j.srv.getLeaderAcl(), - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - evals = append(evals, eval) + // Make a raft apply to release the CSI volume claims of terminal allocs. + var result *multierror.Error + err = volumesToGC.apply() + if err != nil { + result = multierror.Append(result, err) } - // If the job is periodic or parameterized, we don't create an eval - // for the job, but might still need one for the volumes - if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - evals = append(evals, eval) + // If the job is periodic or parameterized, we don't create an eval. + if job != nil && (job.IsPeriodic() || job.IsParameterized()) { + return nil } - if len(evals) > 0 { - update := &structs.EvalUpdateRequest{ - Evals: evals, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return err - } + // Create a new evaluation + // XXX: The job priority / type is strange for this, since it's not a high + // priority even if the job was. + now := time.Now().UTC().UnixNano() + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + JobModifyIndex: index, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Populate the reply with eval information - reply.EvalID = evals[len(evals)-1].ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + result = multierror.Append(result, err) + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return result.ErrorOrNil() } - return nil + // Populate the reply with eval information + reply.EvalID = eval.ID + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex + return result.ErrorOrNil() } // BatchDeregister is used to remove a set of jobs from the cluster. diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index e2b4d6940cc..d613f62b002 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene now := time.Now() var evals []*structs.Evaluation - // A set of de-duplicated volumes that need volume claim GC. - // Later we'll create a gc eval for each volume. - volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace] + // A set of de-duplicated volumes that need their volume claims released. + // Later we'll apply this raft. + volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100) for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() @@ -1113,11 +1113,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } - // If the terminal alloc has CSI volumes, add its job to the list - // of jobs we're going to call volume claim GC on. + // If the terminal alloc has CSI volumes, add the volumes to the batch + // of volumes we'll release the claims of. for _, vol := range taskGroup.Volumes { if vol.Type == structs.VolumeTypeCSI { - volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace} + volumesToGC.add(vol.Source, alloc.Namespace) } } @@ -1138,24 +1138,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene } } - // Add an evaluation for garbage collecting the the CSI volume claims - // of terminal allocs - for _, volAndNamespace := range volumesToGC { - // we have to build this eval by hand rather than calling srv.CoreJob - // here because we need to use the volume's namespace - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: volAndNamespace[1], - Priority: structs.CoreJobPriority, - Type: structs.JobTypeCore, - TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0], - LeaderACL: n.srv.getLeaderAcl(), - Status: structs.EvalStatusPending, - CreateTime: now.UTC().UnixNano(), - ModifyTime: now.UTC().UnixNano(), - } - evals = append(evals, eval) + // Make a raft apply to release the CSI volume claims of terminal allocs. + var result *multierror.Error + err := volumesToGC.apply() + if err != nil { + result = multierror.Append(result, err) } // Add this to the batch @@ -1188,12 +1175,13 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // Wait for the future if err := future.Wait(); err != nil { - return err + result = multierror.Append(result, err) + return result.ErrorOrNil() } // Setup the response reply.Index = future.Index() - return nil + return result.ErrorOrNil() } // batchUpdate is used to update all the allocations diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 76f8ebcf324..8d56bf2c95e 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2321,6 +2321,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { codec := rpcClient(t, srv) state := srv.fsm.State() + + index := uint64(0) ws := memdb.NewWatchSet() // Create a client node, plugin, and volume @@ -2333,7 +2335,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { ControllerInfo: &structs.CSIControllerInfo{}, }, } - err := state.UpsertNode(99, node) + index++ + err := state.UpsertNode(index, node) require.NoError(t, err) volId0 := uuid.Generate() ns := structs.DefaultNamespace @@ -2344,7 +2347,8 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }} - err = state.CSIVolumeRegister(100, vols) + index++ + err = state.CSIVolumeRegister(index, vols) require.NoError(t, err) vol, err := state.CSIVolumeByID(ws, ns, volId0) require.NoError(t, err) @@ -2361,39 +2365,51 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { ReadOnly: false, }, } - err = state.UpsertJob(101, job) + index++ + err = state.UpsertJob(index, job) require.NoError(t, err) alloc1 := mock.Alloc() alloc1.JobID = job.ID alloc1.NodeID = node.ID - err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID)) + index++ + err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) require.NoError(t, err) alloc1.TaskGroup = job.TaskGroups[0].Name alloc2 := mock.Alloc() alloc2.JobID = job.ID alloc2.NodeID = node.ID - err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID)) + index++ + err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) require.NoError(t, err) alloc2.TaskGroup = job.TaskGroups[0].Name - err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2}) + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2}) require.NoError(t, err) - // Claim the volumes and verify the claims were set - err = state.CSIVolumeClaim(105, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Mode: structs.CSIVolumeClaimWrite, - }) - require.NoError(t, err) - err = state.CSIVolumeClaim(106, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Mode: structs.CSIVolumeClaimRead, - }) + // Claim the volumes and verify the claims were set. We need to + // apply this through the FSM so that we make sure the index is + // properly updated to test later + batch := &structs.CSIVolumeClaimBatchRequest{ + Claims: []structs.CSIVolumeClaimRequest{ + { + VolumeID: volId0, + AllocationID: alloc1.ID, + NodeID: alloc1.NodeID, + Claim: structs.CSIVolumeClaimWrite, + }, + { + VolumeID: volId0, + AllocationID: alloc2.ID, + NodeID: alloc2.NodeID, + Claim: structs.CSIVolumeClaimRead, + }, + }} + _, lastIndex, err := srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) require.NoError(t, err) + vol, err = state.CSIVolumeByID(ws, ns, volId0) require.NoError(t, err) require.Len(t, vol.ReadAllocs, 1) @@ -2413,11 +2429,14 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { require.NoError(t, err) require.Equal(t, structs.AllocClientStatusFailed, out.ClientStatus) - // Verify the eval for the claim GC was emitted - // Lookup the evaluations - eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0) - require.NotNil(t, eval) - require.Nil(t, err) + // Verify the index has been updated to trigger a volume claim release + + req := &structs.CSIVolumeGetRequest{ID: volId0} + req.Region = "global" + getResp := &structs.CSIVolumeGetResponse{} + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, getResp) + require.NoError(t, err) + require.Greater(t, getResp.Volume.ModifyIndex, lastIndex) } func TestClientEndpoint_CreateNodeEvals(t *testing.T) {