Skip to content

Commit

Permalink
csi: volume claim garbage collection
Browse files Browse the repository at this point in the history
When an alloc is marked terminal (and after node unstage/unpublish
have been called), the client syncs the terminal alloc state with the
server via `Node.UpdateAlloc RPC`.

For each job that has a terminal alloc, the `Node.UpdateAlloc` RPC
handler at the server will emit an eval for a new core job to garbage
collect CSI volume claims. When this eval is handled on the core
scheduler, it will call a `volumeReap` method to release the claims
for all terminal allocs on the job.

The volume reap will issue a `ControllerUnpublishVolume` RPC for any
node that has no alloc claiming the volume. Once this returns (or
is skipped), the volume reap will send a new `CSIVolume.Claim` RPC
that releases the volume claim for that allocation in the state store,
making it available for scheduling again.

This same `volumeReap` method will be called from the core job GC,
which gives us a second chance to reclaim volumes during GC if there
were controller RPC failures.
  • Loading branch information
tgross committed Feb 14, 2020
1 parent 44bdb3c commit 5ba637c
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 110 deletions.
5 changes: 0 additions & 5 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ type Config struct {
// for GC. This gives users some time to view terminal deployments.
DeploymentGCThreshold time.Duration

// CSIVolumePublicationGCInterval is how often we dispatch a job to GC
// unclaimed CSI volume publications.
CSIVolumePublicationGCInterval time.Duration

// EvalNackTimeout controls how long we allow a sub-scheduler to
// work on an evaluation before we consider it failed and Nack it.
// This allows that evaluation to be handed to another sub-scheduler
Expand Down Expand Up @@ -378,7 +374,6 @@ func DefaultConfig() *Config {
NodeGCThreshold: 24 * time.Hour,
DeploymentGCInterval: 5 * time.Minute,
DeploymentGCThreshold: 1 * time.Hour,
CSIVolumePublicationGCInterval: 60 * time.Second,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
EvalNackInitialReenqueueDelay: 1 * time.Second,
Expand Down
130 changes: 123 additions & 7 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package nomad
import (
"fmt"
"math"
"strings"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -41,7 +43,8 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Schedule

// Process is used to implement the scheduler.Scheduler interface
func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
switch eval.JobID {
job := strings.Split(eval.JobID, ":") // extra data can be smuggled in w/ JobID
switch job[0] {
case structs.CoreJobEvalGC:
return c.evalGC(eval)
case structs.CoreJobNodeGC:
Expand All @@ -50,8 +53,8 @@ func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
return c.jobGC(eval)
case structs.CoreJobDeploymentGC:
return c.deploymentGC(eval)
case structs.CoreJobCSIVolumePublicationGC:
return c.csiVolumePublicationGC(eval)
case structs.CoreJobCSIVolumeClaimGC:
return c.csiVolumeClaimGC(eval)
case structs.CoreJobForceGC:
return c.forceGC(eval)
default:
Expand Down Expand Up @@ -143,6 +146,7 @@ OUTER:
gcAlloc = append(gcAlloc, jobAlloc...)
gcEval = append(gcEval, jobEval...)
}

}

// Fast-path the nothing case
Expand All @@ -152,6 +156,11 @@ OUTER:
c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))

// Clean up any outstanding volume claims
if err := c.volumeClaimReap(gcJob, eval.LeaderACL); err != nil {
return err
}

// Reap the evals and allocs
if err := c.evalReap(gcEval, gcAlloc); err != nil {
return err
Expand Down Expand Up @@ -706,9 +715,116 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// csiVolumeGC is used to garbage collect CSI volume publications
func (c *CoreScheduler) csiVolumePublicationGC(eval *structs.Evaluation) error {
// TODO: implement me!
c.logger.Trace("garbage collecting unclaimed CSI volume publications")
// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")

// JobID smuggled in with the eval's own JobID
var jobID string
evalJobID := strings.Split(eval.JobID, ":")
if len(evalJobID) != 2 {
c.logger.Error("volume gc called without jobID")
return nil
}

jobID = evalJobID[1]
job, err := c.srv.State().JobByID(nil, eval.Namespace, jobID)
if err != nil || job == nil {
c.logger.Trace(
"cannot find job to perform volume claim GC. it may have been garbage collected",
"job", jobID)
return nil
}
c.volumeClaimReap([]*structs.Job{job}, eval.LeaderACL)
return nil
}

// volumeClaimReap contacts the leader and releases volume claims from
// terminal allocs
func (c *CoreScheduler) volumeClaimReap(jobs []*structs.Job, leaderACL string) error {
ws := memdb.NewWatchSet()
var result *multierror.Error

for _, job := range jobs {
c.logger.Trace("garbage collecting unclaimed CSI volume claims for job", "job", job.ID)
for _, taskGroup := range job.TaskGroups {
for _, tgVolume := range taskGroup.Volumes {
if tgVolume.Type != structs.VolumeTypeCSI {
continue // filter to just CSI volumes
}
volID := tgVolume.Source
vol, err := c.srv.State().CSIVolumeByID(ws, volID)
if err != nil {
result = multierror.Append(result, err)
continue
}
if vol == nil {
c.logger.Trace("cannot find volume to be GC'd. it may have been deregistered",
"volume", volID)
continue
}
vol, err = c.srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
result = multierror.Append(result, err)
continue
}

gcAllocs := []*structs.Allocation{}
unclaimedNodes := map[string]struct{}{}

collectFunc := func(allocs map[string]*structs.Allocation) {
for _, alloc := range allocs {
// we call denormalize on the volume above to make sure the Allocation
// pointer in PastAllocs isn't nil. But the alloc might have been
// garbage collected concurrently, so if the alloc is still nil we can
// safely skip it.
if alloc == nil {
continue
}
if !alloc.Terminated() {
unclaimedNodes[alloc.NodeID] = struct{}{}
continue
}
gcAllocs = append(gcAllocs, alloc)
delete(unclaimedNodes, alloc.NodeID)
}
}

collectFunc(vol.WriteAllocs)
collectFunc(vol.ReadAllocs)

req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Allocation: nil, // unpublish never uses this field
Claim: structs.CSIVolumeClaimGC,
WriteRequest: structs.WriteRequest{
Region: job.Region,
Namespace: job.Namespace,
AuthToken: leaderACL,
},
}

// we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
for node := range unclaimedNodes {
err = c.srv.controllerUnpublishVolume(req, node)
if err != nil {
result = multierror.Append(result, err)
continue
}
}

for _, alloc := range gcAllocs {
req.Allocation = alloc
err = c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
if err != nil {
c.logger.Error("volume claim release failed", "error", err)
result = multierror.Append(result, err)
}
}
}
}
}
return result.ErrorOrNil()
}
141 changes: 110 additions & 31 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,37 +1836,6 @@ func TestCoreScheduler_DeploymentGC_Force(t *testing.T) {
}
}

// TODO: this is an empty test until CoreScheduler.csiVolumePublicationGC is implemented
func TestCoreScheduler_CSIVolumePublicationGC(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

// TODO: insert volumes for nodes
state := s1.fsm.State()

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.CSIVolumePublicationGCInterval))

// Create a core scheduler
snap, err := state.Snapshot()
assert.Nil(err, "Snapshot")
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobCSIVolumePublicationGC, 2000)
assert.Nil(core.Process(gc), "Process GC")

// TODO: assert state is cleaned up
}

func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2224,3 +2193,113 @@ func TestAllocation_GCEligible(t *testing.T) {
alloc.ClientStatus = structs.AllocClientStatusComplete
require.True(allocGCEligible(alloc, nil, time.Now(), 1000))
}

func TestCSI_GCVolumeClaims(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

// codec := rpcClient(t, srv)
state := srv.fsm.State()
ws := memdb.NewWatchSet()

// Create a client node, plugin, and volume
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version
node.CSINodePlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {PluginID: "csi-plugin-example",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
err := state.UpsertNode(99, node)
require.NoError(t, err)
volId0 := uuid.Generate()
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: "notTheNamespace",
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
err = state.CSIVolumeRegister(100, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)

// Create a job with 2 allocations
job := mock.Job()
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"_": {
Name: "someVolume",
Type: structs.VolumeTypeCSI,
Source: volId0,
ReadOnly: false,
},
}
err = state.UpsertJob(101, job)
require.NoError(t, err)

alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name

err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)

// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite)
require.NoError(t, err)
err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead)
require.NoError(t, err)
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)

// Update the 1st alloc as failed/terminated
alloc1.ClientStatus = structs.AllocClientStatusFailed
err = state.UpdateAllocsFromClient(107, []*structs.Allocation{alloc1})
require.NoError(t, err)

// Create the GC eval we'd get from Node.UpdateAlloc
now := time.Now().UTC()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID,
LeaderACL: srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}

// Process the eval
snap, err := state.Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
err = core.Process(eval)
require.NoError(t, err)

// Verify the claim was released
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 0)
}
1 change: 1 addition & 0 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (srv *Server) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, r

// controllerUnpublishVolume sends an unpublish request to the CSI
// controller plugin associated with a volume, if any.
// TODO: the only caller of this won't have an alloc pointer handy, should it be its own request arg type?
func (srv *Server) controllerUnpublishVolume(req *structs.CSIVolumeClaimRequest, nodeID string) error {
plug, vol, err := srv.volAndPluginLookup(req.VolumeID)
if plug == nil || vol == nil || err != nil {
Expand Down
6 changes: 0 additions & 6 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,6 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
defer jobGC.Stop()
deploymentGC := time.NewTicker(s.config.DeploymentGCInterval)
defer deploymentGC.Stop()
csiVolumePublicationGC := time.NewTicker(s.config.CSIVolumePublicationGCInterval)
defer csiVolumePublicationGC.Stop()

// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
Expand Down Expand Up @@ -551,10 +549,6 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobDeploymentGC, index))
}
case <-csiVolumePublicationGC.C:
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumePublicationGC, index))
}
case <-stopCh:
return
}
Expand Down
Loading

0 comments on commit 5ba637c

Please sign in to comment.