Skip to content

Commit

Permalink
csi: don't pass volume claim releases thru GC eval (#8021)
Browse files Browse the repository at this point in the history
Following the new volumewatcher in #7794 and performance improvements
to it that landed afterwards, there's no particular reason we should
be threading claim releases through the GC eval rather than writing an
empty `CSIVolumeClaimRequest` with the mode set to
`CSIVolumeClaimRelease`, just as the GC evaluation would do.

Also, by batching up these raft messages, we can reduce the amount of
raft writes by 1 and cross-server RPCs by 1 per volume we release
claims on.
  • Loading branch information
tgross authored May 20, 2020
1 parent db4c88f commit b3a33e0
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 110 deletions.
76 changes: 76 additions & 0 deletions nomad/csi_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package nomad

import (
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)

// csiBatchRelease is a helper for any time we need to release a bunch
// of volume claims at once. It de-duplicates the volumes and batches
// the raft messages into manageable chunks. Intended for use by RPCs
// that have already been forwarded to the leader.
type csiBatchRelease struct {
srv *Server
logger log.Logger

maxBatchSize int
seen map[string]struct{}
batches []*structs.CSIVolumeClaimBatchRequest
}

func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
return &csiBatchRelease{
srv: srv,
logger: logger,
maxBatchSize: max,
seen: map[string]struct{}{},
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
}
}

// add the volume ID + namespace to the deduplicated batches
func (c *csiBatchRelease) add(vol, namespace string) {
id := vol + "\x00" + namespace

// ignore duplicates
_, seen := c.seen[id]
if seen {
return
}
c.seen[id] = struct{}{}

req := structs.CSIVolumeClaimRequest{
VolumeID: vol,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = namespace
req.Region = c.srv.config.Region

for _, batch := range c.batches {
// otherwise append to the first non-full batch
if len(batch.Claims) < c.maxBatchSize {
batch.Claims = append(batch.Claims, req)
return
}
}
// no non-full batch found, make a new one
newBatch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{req}}
c.batches = append(c.batches, newBatch)
}

// apply flushes the batches to raft
func (c *csiBatchRelease) apply() error {
var result *multierror.Error
for _, batch := range c.batches {
if len(batch.Claims) > 0 {
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
if err != nil {
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
result = multierror.Append(result, err)
}
}
}
return result.ErrorOrNil()
}
34 changes: 34 additions & 0 deletions nomad/csi_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package nomad

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCSI_Batcher(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()

batcher := newCSIBatchRelease(srv, nil, 5)

batcher.add("vol0", "global")
batcher.add("vol", "0global")
batcher.add("vol1", "global")
batcher.add("vol1", "global")
batcher.add("vol2", "global")
batcher.add("vol2", "other")
batcher.add("vol3", "global")
batcher.add("vol4", "global")
batcher.add("vol5", "global")
batcher.add("vol6", "global")

require.Len(t, batcher.batches, 2)
require.Len(t, batcher.batches[0].Claims, 5, "first batch")
require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2")
require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other")
require.Len(t, batcher.batches[1].Claims, 4, "second batch")
}
105 changes: 44 additions & 61 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,20 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}

// For a job with volumes, find its volumes before deleting the job
volumesToGC := make(map[string]*structs.VolumeRequest)
// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source] = vol
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}

// Commit this update via Raft
// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.logger.Error("deregister failed", "error", err)
Expand All @@ -729,70 +730,52 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Populate the reply with job information
reply.JobModifyIndex = index

evals := []*structs.Evaluation{}
now := time.Now().UTC().UnixNano()

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, vol := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source,
LeaderACL: j.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err = volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval
// for the job, but might still need one for the volumes
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

if len(evals) > 0 {
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Populate the reply with eval information
reply.EvalID = evals[len(evals)-1].ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}

return nil
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

// BatchDeregister is used to remove a set of jobs from the cluster.
Expand Down
40 changes: 14 additions & 26 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need volume claim GC.
// Later we'll create a gc eval for each volume.
volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace]
// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
Expand Down Expand Up @@ -1113,11 +1113,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add its job to the list
// of jobs we're going to call volume claim GC on.
// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace}
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

Expand All @@ -1138,24 +1138,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, volAndNamespace := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: volAndNamespace[1],
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0],
LeaderACL: n.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err := volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// Add this to the batch
Expand Down Expand Up @@ -1188,12 +1175,13 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene

// Wait for the future
if err := future.Wait(); err != nil {
return err
result = multierror.Append(result, err)
return result.ErrorOrNil()
}

// Setup the response
reply.Index = future.Index()
return nil
return result.ErrorOrNil()
}

// batchUpdate is used to update all the allocations
Expand Down
Loading

0 comments on commit b3a33e0

Please sign in to comment.