Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: don't pass volume claim releases thru GC eval #8021

Merged
merged 3 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions nomad/csi_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package nomad

import (
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)

// csiBatchRelease is a helper for any time we need to release a bunch
// of volume claims at once. It de-duplicates the volumes and batches
// the raft messages into manageable chunks. Intended for use by RPCs
// that have already been forwarded to the leader.
type csiBatchRelease struct {
srv *Server
logger log.Logger

maxBatchSize int
seen map[string]struct{}
batches []*structs.CSIVolumeClaimBatchRequest
}

func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease {
return &csiBatchRelease{
srv: srv,
logger: logger,
maxBatchSize: max,
seen: map[string]struct{}{},
batches: []*structs.CSIVolumeClaimBatchRequest{{}},
}
}

// add the volume ID + namespace to the deduplicated batches
func (c *csiBatchRelease) add(vol, namespace string) {
id := vol + "\x00" + namespace

// ignore duplicates
_, seen := c.seen[id]
if seen {
return
}
c.seen[id] = struct{}{}

req := structs.CSIVolumeClaimRequest{
VolumeID: vol,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = namespace
req.Region = c.srv.config.Region

for _, batch := range c.batches {
// otherwise append to the first non-full batch
if len(batch.Claims) < c.maxBatchSize {
batch.Claims = append(batch.Claims, req)
return
}
}
// no non-full batch found, make a new one
newBatch := &structs.CSIVolumeClaimBatchRequest{
Claims: []structs.CSIVolumeClaimRequest{req}}
c.batches = append(c.batches, newBatch)
}

// apply flushes the batches to raft
func (c *csiBatchRelease) apply() error {
var result *multierror.Error
for _, batch := range c.batches {
if len(batch.Claims) > 0 {
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch)
if err != nil {
c.logger.Error("csi raft apply failed", "error", err, "method", "claim")
result = multierror.Append(result, err)
}
}
}
return result.ErrorOrNil()
}
34 changes: 34 additions & 0 deletions nomad/csi_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package nomad

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCSI_Batcher(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()

batcher := newCSIBatchRelease(srv, nil, 5)

batcher.add("vol0", "global")
batcher.add("vol", "0global")
batcher.add("vol1", "global")
batcher.add("vol1", "global")
batcher.add("vol2", "global")
batcher.add("vol2", "other")
batcher.add("vol3", "global")
batcher.add("vol4", "global")
batcher.add("vol5", "global")
batcher.add("vol6", "global")

require.Len(t, batcher.batches, 2)
require.Len(t, batcher.batches[0].Claims, 5, "first batch")
require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2")
require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other")
require.Len(t, batcher.batches[1].Claims, 4, "second batch")
}
105 changes: 44 additions & 61 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,20 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}

// For a job with volumes, find its volumes before deleting the job
volumesToGC := make(map[string]*structs.VolumeRequest)
// For a job with volumes, find its volumes before deleting the job.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(j.srv, j.logger, 100)
if job != nil {
for _, tg := range job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source] = vol
volumesToGC.add(vol.Source, job.Namespace)
}
}
}
}

// Commit this update via Raft
// Commit the job update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.logger.Error("deregister failed", "error", err)
Expand All @@ -729,70 +730,52 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Populate the reply with job information
reply.JobModifyIndex = index

evals := []*structs.Evaluation{}
now := time.Now().UTC().UnixNano()

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, vol := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source,
LeaderACL: j.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err = volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// If the job is periodic or parameterized, we don't create an eval
// for the job, but might still need one for the volumes
if job == nil || !(job.IsPeriodic() || job.IsParameterized()) {
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
return nil
}

if len(evals) > 0 {
update := &structs.EvalUpdateRequest{
Evals: evals,
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}
// Create a new evaluation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for reviewers: from here down the changes in this file are restoring the code that existed pre-0.11.0

// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Populate the reply with eval information
reply.EvalID = evals[len(evals)-1].ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
result = multierror.Append(result, err)
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return result.ErrorOrNil()
}

return nil
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return result.ErrorOrNil()
}

// BatchDeregister is used to remove a set of jobs from the cluster.
Expand Down
40 changes: 14 additions & 26 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,9 +1081,9 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
now := time.Now()
var evals []*structs.Evaluation

// A set of de-duplicated volumes that need volume claim GC.
// Later we'll create a gc eval for each volume.
volumesToGC := make(map[string][]string) // ID+namespace -> [id, namespace]
// A set of de-duplicated volumes that need their volume claims released.
// Later we'll apply this raft.
volumesToGC := newCSIBatchRelease(n.srv, n.logger, 100)

for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
Expand Down Expand Up @@ -1113,11 +1113,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}

// If the terminal alloc has CSI volumes, add its job to the list
// of jobs we're going to call volume claim GC on.
// If the terminal alloc has CSI volumes, add the volumes to the batch
// of volumes we'll release the claims of.
for _, vol := range taskGroup.Volumes {
if vol.Type == structs.VolumeTypeCSI {
volumesToGC[vol.Source+alloc.Namespace] = []string{vol.Source, alloc.Namespace}
volumesToGC.add(vol.Source, alloc.Namespace)
}
}

Expand All @@ -1138,24 +1138,11 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
}
}

// Add an evaluation for garbage collecting the the CSI volume claims
// of terminal allocs
for _, volAndNamespace := range volumesToGC {
// we have to build this eval by hand rather than calling srv.CoreJob
// here because we need to use the volume's namespace
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: volAndNamespace[1],
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0],
LeaderACL: n.srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
// Make a raft apply to release the CSI volume claims of terminal allocs.
var result *multierror.Error
err := volumesToGC.apply()
if err != nil {
result = multierror.Append(result, err)
}

// Add this to the batch
Expand Down Expand Up @@ -1188,12 +1175,13 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene

// Wait for the future
if err := future.Wait(); err != nil {
return err
result = multierror.Append(result, err)
return result.ErrorOrNil()
}

// Setup the response
reply.Index = future.Index()
return nil
return result.ErrorOrNil()
}

// batchUpdate is used to update all the allocations
Expand Down
Loading