diff --git a/command/agent/agent.go b/command/agent/agent.go index 5f4d95be22d..825a8eb6e14 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -314,6 +314,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { } conf.DeploymentGCThreshold = dur } + if gcThreshold := agentConfig.Server.CSIVolumeClaimGCThreshold; gcThreshold != "" { + dur, err := time.ParseDuration(gcThreshold) + if err != nil { + return nil, err + } + conf.CSIVolumeClaimGCThreshold = dur + } if gcThreshold := agentConfig.Server.CSIPluginGCThreshold; gcThreshold != "" { dur, err := time.ParseDuration(gcThreshold) if err != nil { diff --git a/command/agent/config.go b/command/agent/config.go index 799d57eeaf4..8e7c658ba73 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -389,6 +389,11 @@ type ServerConfig struct { // GCed but the threshold can be used to filter by age. DeploymentGCThreshold string `hcl:"deployment_gc_threshold"` + // CSIVolumeClaimGCThreshold controls how "old" a CSI volume must be to + // have its claims collected by GC. Age is not the only requirement for + // a volume to be GCed but the threshold can be used to filter by age. + CSIVolumeClaimGCThreshold string `hcl:"csi_volume_claim_gc_threshold"` + // CSIPluginGCThreshold controls how "old" a CSI plugin must be to be // collected by GC. Age is not the only requirement for a plugin to be // GCed but the threshold can be used to filter by age. @@ -1328,6 +1333,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { if b.DeploymentGCThreshold != "" { result.DeploymentGCThreshold = b.DeploymentGCThreshold } + if b.CSIVolumeClaimGCThreshold != "" { + result.CSIVolumeClaimGCThreshold = b.CSIVolumeClaimGCThreshold + } if b.CSIPluginGCThreshold != "" { result.CSIPluginGCThreshold = b.CSIPluginGCThreshold } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index e5523373f0f..b9a06d8c90e 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -91,35 +91,36 @@ var basicConfig = &Config{ BridgeNetworkSubnet: "custom_bridge_subnet", }, Server: &ServerConfig{ - Enabled: true, - AuthoritativeRegion: "foobar", - BootstrapExpect: 5, - DataDir: "/tmp/data", - ProtocolVersion: 3, - RaftProtocol: 3, - NumSchedulers: helper.IntToPtr(2), - EnabledSchedulers: []string{"test"}, - NodeGCThreshold: "12h", - EvalGCThreshold: "12h", - JobGCInterval: "3m", - JobGCThreshold: "12h", - DeploymentGCThreshold: "12h", - CSIPluginGCThreshold: "12h", - HeartbeatGrace: 30 * time.Second, - HeartbeatGraceHCL: "30s", - MinHeartbeatTTL: 33 * time.Second, - MinHeartbeatTTLHCL: "33s", - MaxHeartbeatsPerSecond: 11.0, - RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, - StartJoin: []string{"1.1.1.1", "2.2.2.2"}, - RetryInterval: 15 * time.Second, - RetryIntervalHCL: "15s", - RejoinAfterLeave: true, - RetryMaxAttempts: 3, - NonVotingServer: true, - RedundancyZone: "foo", - UpgradeVersion: "0.8.0", - EncryptKey: "abc", + Enabled: true, + AuthoritativeRegion: "foobar", + BootstrapExpect: 5, + DataDir: "/tmp/data", + ProtocolVersion: 3, + RaftProtocol: 3, + NumSchedulers: helper.IntToPtr(2), + EnabledSchedulers: []string{"test"}, + NodeGCThreshold: "12h", + EvalGCThreshold: "12h", + JobGCInterval: "3m", + JobGCThreshold: "12h", + DeploymentGCThreshold: "12h", + CSIVolumeClaimGCThreshold: "12h", + CSIPluginGCThreshold: "12h", + HeartbeatGrace: 30 * time.Second, + HeartbeatGraceHCL: "30s", + MinHeartbeatTTL: 33 * time.Second, + MinHeartbeatTTLHCL: "33s", + MaxHeartbeatsPerSecond: 11.0, + RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, + StartJoin: []string{"1.1.1.1", "2.2.2.2"}, + RetryInterval: 15 * time.Second, + RetryIntervalHCL: "15s", + RejoinAfterLeave: true, + RetryMaxAttempts: 3, + NonVotingServer: true, + RedundancyZone: "foo", + UpgradeVersion: "0.8.0", + EncryptKey: "abc", ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 69ed6cf007a..cad07e2cc7d 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -102,32 +102,33 @@ client { } server { - enabled = true - authoritative_region = "foobar" - bootstrap_expect = 5 - data_dir = "/tmp/data" - protocol_version = 3 - raft_protocol = 3 - num_schedulers = 2 - enabled_schedulers = ["test"] - node_gc_threshold = "12h" - job_gc_interval = "3m" - job_gc_threshold = "12h" - eval_gc_threshold = "12h" - deployment_gc_threshold = "12h" - csi_plugin_gc_threshold = "12h" - heartbeat_grace = "30s" - min_heartbeat_ttl = "33s" - max_heartbeats_per_second = 11.0 - retry_join = ["1.1.1.1", "2.2.2.2"] - start_join = ["1.1.1.1", "2.2.2.2"] - retry_max = 3 - retry_interval = "15s" - rejoin_after_leave = true - non_voting_server = true - redundancy_zone = "foo" - upgrade_version = "0.8.0" - encrypt = "abc" + enabled = true + authoritative_region = "foobar" + bootstrap_expect = 5 + data_dir = "/tmp/data" + protocol_version = 3 + raft_protocol = 3 + num_schedulers = 2 + enabled_schedulers = ["test"] + node_gc_threshold = "12h" + job_gc_interval = "3m" + job_gc_threshold = "12h" + eval_gc_threshold = "12h" + deployment_gc_threshold = "12h" + csi_volume_claim_gc_threshold = "12h" + csi_plugin_gc_threshold = "12h" + heartbeat_grace = "30s" + min_heartbeat_ttl = "33s" + max_heartbeats_per_second = 11.0 + retry_join = ["1.1.1.1", "2.2.2.2"] + start_join = ["1.1.1.1", "2.2.2.2"] + retry_max = 3 + retry_interval = "15s" + rejoin_after_leave = true + non_voting_server = true + redundancy_zone = "foo" + upgrade_version = "0.8.0" + encrypt = "abc" server_join { retry_join = ["1.1.1.1", "2.2.2.2"] diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index ab18da9c1c3..2a5edafb018 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -257,6 +257,7 @@ "authoritative_region": "foobar", "bootstrap_expect": 5, "csi_plugin_gc_threshold": "12h", + "csi_volume_claim_gc_threshold": "12h", "data_dir": "/tmp/data", "deployment_gc_threshold": "12h", "enabled": true, diff --git a/nomad/config.go b/nomad/config.go index e7be14d2c24..c325df5db18 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -198,6 +198,14 @@ type Config struct { // for GC. This gives users some time to debug plugins. CSIPluginGCThreshold time.Duration + // CSIVolumeClaimGCInterval is how often we dispatch a job to GC + // volume claims. + CSIVolumeClaimGCInterval time.Duration + + // CSIVolumeClaimGCThreshold is how "old" a volume must be to be + // eligible for GC. This gives users some time to debug volumes. + CSIVolumeClaimGCThreshold time.Duration + // EvalNackTimeout controls how long we allow a sub-scheduler to // work on an evaluation before we consider it failed and Nack it. // This allows that evaluation to be handed to another sub-scheduler @@ -386,6 +394,8 @@ func DefaultConfig() *Config { DeploymentGCThreshold: 1 * time.Hour, CSIPluginGCInterval: 5 * time.Minute, CSIPluginGCThreshold: 1 * time.Hour, + CSIVolumeClaimGCInterval: 5 * time.Minute, + CSIVolumeClaimGCThreshold: 1 * time.Hour, EvalNackTimeout: 60 * time.Second, EvalDeliveryLimit: 3, EvalNackInitialReenqueueDelay: 1 * time.Second, diff --git a/nomad/core_sched.go b/nomad/core_sched.go index e3f762f4c38..c2d3a1463d7 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -77,6 +77,9 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { if err := c.csiPluginGC(eval); err != nil { return err } + if err := c.csiVolumeClaimGC(eval); err != nil { + return err + } // Node GC must occur after the others to ensure the allocations are // cleared. @@ -714,32 +717,103 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, return timeDiff > interval.Nanoseconds() } -// TODO: we need a periodic trigger to iterate over all the volumes and split -// them up into separate work items, same as we do for jobs. - // csiVolumeClaimGC is used to garbage collect CSI volume claims func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { + + gcClaims := func(ns, volID string) error { + req := &structs.CSIVolumeClaimRequest{ + VolumeID: volID, + Claim: structs.CSIVolumeClaimRelease, + } + req.Namespace = ns + req.Region = c.srv.config.Region + err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) + return err + } + c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID) // Volume ID smuggled in with the eval's own JobID evalVolID := strings.Split(eval.JobID, ":") // COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2 - if len(evalVolID) < 2 { - c.logger.Error("volume gc called without volID") - return nil + if len(evalVolID) > 1 { + volID := evalVolID[1] + return gcClaims(eval.Namespace, volID) + } + + ws := memdb.NewWatchSet() + + iter, err := c.snap.CSIVolumes(ws) + if err != nil { + return err + } + + // Get the time table to calculate GC cutoffs. + var oldThreshold uint64 + if eval.JobID == structs.CoreJobForceGC { + // The GC was forced, so set the threshold to its maximum so + // everything will GC. + oldThreshold = math.MaxUint64 + c.logger.Debug("forced volume claim GC") + } else { + tt := c.srv.fsm.TimeTable() + cutoff := time.Now().UTC().Add(-1 * c.srv.config.CSIVolumeClaimGCThreshold) + oldThreshold = tt.NearestIndex(cutoff) } - volID := evalVolID[1] - req := &structs.CSIVolumeClaimRequest{ - VolumeID: volID, - Claim: structs.CSIVolumeClaimRelease, + c.logger.Debug("CSI volume claim GC scanning before cutoff index", + "index", oldThreshold, + "csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold) + +NEXT_VOLUME: + for i := iter.Next(); i != nil; i = iter.Next() { + vol := i.(*structs.CSIVolume) + + // Ignore new volumes + if vol.CreateIndex > oldThreshold { + continue + } + + // we only call the claim release RPC if the volume has claims + // that no longer have valid allocations. otherwise we'd send + // out a lot of do-nothing RPCs. + for id := range vol.ReadClaims { + alloc, err := c.snap.AllocByID(ws, id) + if err != nil { + return err + } + if alloc == nil { + err = gcClaims(vol.Namespace, vol.ID) + if err != nil { + return err + } + goto NEXT_VOLUME + } + } + for id := range vol.WriteClaims { + alloc, err := c.snap.AllocByID(ws, id) + if err != nil { + return err + } + if alloc == nil { + err = gcClaims(vol.Namespace, vol.ID) + if err != nil { + return err + } + goto NEXT_VOLUME + } + } + if len(vol.PastClaims) > 0 { + err = gcClaims(vol.Namespace, vol.ID) + if err != nil { + return err + } + } + } - req.Namespace = eval.Namespace - req.Region = c.srv.config.Region + return nil - err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) - return err } // csiPluginGC is used to garbage collect unused plugins diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index bfebd51baad..3d02846a6a5 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -6,6 +6,7 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" @@ -2212,7 +2213,7 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) { // Update the time tables to make this work tt := srv.fsm.TimeTable() index := uint64(2000) - tt.Witness(index, time.Now().UTC().Add(-1*srv.config.DeploymentGCThreshold)) + tt.Witness(index, time.Now().UTC().Add(-1*srv.config.CSIPluginGCThreshold)) // Create a core scheduler snap, err := state.Snapshot() @@ -2248,3 +2249,179 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) { require.Nil(plug) require.NoError(err) } + +func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { + t.Parallel() + require := require.New(t) + + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + codec := rpcClient(t, srv) + + index := uint64(1) + volID := uuid.Generate() + ns := structs.DefaultNamespace + pluginID := "foo" + + state := srv.fsm.State() + ws := memdb.NewWatchSet() + + index, _ = state.LatestIndex() + + // Create client node and plugin + node := mock.Node() + node.Attributes["nomad.version"] = "0.11.0" // needs client RPCs + node.CSINodePlugins = map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + Healthy: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err := state.UpsertNode(index, node) + require.NoError(err) + + // Note that for volume writes in this test we need to use the + // RPCs rather than StateStore methods directly so that the GC + // job's RPC call updates a later index. otherwise the + // volumewatcher won't trigger for the final GC + + // Register a volume + vols := []*structs.CSIVolume{{ + ID: volID, + Namespace: ns, + PluginID: pluginID, + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + Topologies: []*structs.CSITopology{}, + }} + volReq := &structs.CSIVolumeRegisterRequest{Volumes: vols} + volReq.Namespace = ns + volReq.Region = srv.config.Region + + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register", + volReq, &structs.CSIVolumeRegisterResponse{}) + require.NoError(err) + + // Create a job with two allocations that claim the volume. + // We use two allocs here, one of which is not running, so + // that we can assert that the volumewatcher has made one + // complete pass (and removed the 2nd alloc) before running + // the GC. + eval := mock.Eval() + eval.Status = structs.EvalStatusFailed + index++ + state.UpsertJobSummary(index, mock.JobSummary(eval.JobID)) + index++ + err = state.UpsertEvals(index, []*structs.Evaluation{eval}) + require.Nil(err) + + job := mock.Job() + job.ID = eval.JobID + job.Status = structs.JobStatusRunning + index++ + err = state.UpsertJob(index, job) + require.NoError(err) + + alloc1, alloc2 := mock.Alloc(), mock.Alloc() + alloc1.NodeID = node.ID + alloc1.ClientStatus = structs.AllocClientStatusRunning + alloc1.Job = job + alloc1.JobID = job.ID + alloc1.EvalID = eval.ID + + alloc2.NodeID = node.ID + alloc2.ClientStatus = structs.AllocClientStatusComplete + alloc2.Job = job + alloc2.JobID = job.ID + alloc2.EvalID = eval.ID + + summary := mock.JobSummary(alloc1.JobID) + index++ + require.NoError(state.UpsertJobSummary(index, summary)) + summary = mock.JobSummary(alloc2.JobID) + index++ + require.NoError(state.UpsertJobSummary(index, summary)) + index++ + require.NoError(state.UpsertAllocs(index, + []*structs.Allocation{alloc1, alloc2})) + + // Claim the volume for the alloc + req := &structs.CSIVolumeClaimRequest{ + AllocationID: alloc1.ID, + NodeID: node.ID, + VolumeID: volID, + Claim: structs.CSIVolumeClaimWrite, + } + req.Namespace = ns + req.Region = srv.config.Region + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", + req, &structs.CSIVolumeClaimResponse{}) + require.NoError(err) + + // ready-to-free claim; once it's gone we know the volumewatcher + // has run once and stopped + req.AllocationID = alloc2.ID + req.Claim = structs.CSIVolumeClaimRelease + req.State = structs.CSIVolumeClaimStateControllerDetached + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", + req, &structs.CSIVolumeClaimResponse{}) + require.NoError(err) + + // wait for volumewatcher + var vol *structs.CSIVolume + require.Eventually(func() bool { + vol, _ = state.CSIVolumeByID(ws, ns, volID) + return len(vol.ReadAllocs) == 0 && + len(vol.ReadClaims) == 0 && + len(vol.PastClaims) == 0 + }, time.Second*1, 10*time.Millisecond, "stale claim was not released") + + // Delete allocation and job + index++ + err = state.DeleteJob(index, ns, job.ID) + require.NoError(err) + index++ + err = state.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID}) + require.NoError(err) + + // Create a core scheduler and attempt the volume claim GC + snap, err := state.Snapshot() + require.NoError(err) + core := NewCoreScheduler(srv, snap) + + index++ + gc := srv.coreJobEval(structs.CoreJobForceGC, index) + c := core.(*CoreScheduler) + require.NoError(c.csiVolumeClaimGC(gc)) + + // the volumewatcher will hit an error here because there's no + // path to the node. but we can't update the claim to bypass the + // client RPCs without triggering the volumewatcher's normal code + // path. + require.Eventually(func() bool { + vol, _ = state.CSIVolumeByID(ws, ns, volID) + return len(vol.WriteClaims) == 1 && + len(vol.WriteAllocs) == 1 && + len(vol.PastClaims) == 0 + }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") + + req.AllocationID = alloc1.ID + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", + req, &structs.CSIVolumeClaimResponse{}) + require.NoError(err) + + // wait for volumewatcher + require.Eventually(func() bool { + vol, _ = state.CSIVolumeByID(ws, ns, volID) + return len(vol.WriteClaims) == 0 && + len(vol.WriteAllocs) == 0 && + len(vol.PastClaims) == 0 + }, time.Second*1, 10*time.Millisecond, "claims were not released") + +} diff --git a/nomad/leader.go b/nomad/leader.go index 97a59eb1aaf..18c36ef386e 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -524,6 +524,8 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { defer deploymentGC.Stop() csiPluginGC := time.NewTicker(s.config.CSIPluginGCInterval) defer csiPluginGC.Stop() + csiVolumeClaimGC := time.NewTicker(s.config.CSIVolumeClaimGCInterval) + defer csiVolumeClaimGC.Stop() // getLatest grabs the latest index from the state store. It returns true if // the index was retrieved successfully. @@ -560,6 +562,10 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) { if index, ok := getLatest(); ok { s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIPluginGC, index)) } + case <-csiVolumeClaimGC.C: + if index, ok := getLatest(); ok { + s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index)) + } case <-stopCh: return diff --git a/website/pages/docs/configuration/server.mdx b/website/pages/docs/configuration/server.mdx index e14a3fae24f..ec2748f94af 100644 --- a/website/pages/docs/configuration/server.mdx +++ b/website/pages/docs/configuration/server.mdx @@ -1,4 +1,4 @@ ---- +-- layout: docs page_title: server Stanza - Agent Configuration sidebar_title: server @@ -90,7 +90,11 @@ server { deployment must be in the terminal state before it is eligible for garbage collection. This is specified using a label suffix like "30s" or "1h". -- `csi_plugin_gc_threshold` `(string: "1h")` - Specifies the minimum age of +- `csi_volume_claim_gc_threshold` `(string: "1h")` - Specifies the minimum age of + a CSI volume before it is eligible to have its claims garbage collected. + This is specified using a label suffix like "30s" or "1h". + +- `csi_plugin_gc_threshold` `(string: "1h")` - Specifies the minimum age of a CSI plugin before it is eligible for garbage collection if not in use. This is specified using a label suffix like "30s" or "1h".