-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodic GC for volume claims #7881
Changes from 4 commits
47b71cb
a5170ed
b0e8c44
ab464e4
95c2029
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,32 +102,33 @@ client { | |
} | ||
|
||
server { | ||
enabled = true | ||
authoritative_region = "foobar" | ||
bootstrap_expect = 5 | ||
data_dir = "/tmp/data" | ||
protocol_version = 3 | ||
raft_protocol = 3 | ||
num_schedulers = 2 | ||
enabled_schedulers = ["test"] | ||
node_gc_threshold = "12h" | ||
job_gc_interval = "3m" | ||
job_gc_threshold = "12h" | ||
eval_gc_threshold = "12h" | ||
deployment_gc_threshold = "12h" | ||
csi_plugin_gc_threshold = "12h" | ||
heartbeat_grace = "30s" | ||
min_heartbeat_ttl = "33s" | ||
max_heartbeats_per_second = 11.0 | ||
retry_join = ["1.1.1.1", "2.2.2.2"] | ||
start_join = ["1.1.1.1", "2.2.2.2"] | ||
retry_max = 3 | ||
retry_interval = "15s" | ||
rejoin_after_leave = true | ||
non_voting_server = true | ||
redundancy_zone = "foo" | ||
upgrade_version = "0.8.0" | ||
encrypt = "abc" | ||
enabled = true | ||
authoritative_region = "foobar" | ||
bootstrap_expect = 5 | ||
data_dir = "/tmp/data" | ||
protocol_version = 3 | ||
raft_protocol = 3 | ||
num_schedulers = 2 | ||
enabled_schedulers = ["test"] | ||
node_gc_threshold = "12h" | ||
job_gc_interval = "3m" | ||
job_gc_threshold = "12h" | ||
eval_gc_threshold = "12h" | ||
deployment_gc_threshold = "12h" | ||
csi_volume_claim_gc_threshold = "12h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only new line. Curse you golang diffs! 😀 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but bless you github option to temporarily hide whitespace diffs |
||
csi_plugin_gc_threshold = "12h" | ||
heartbeat_grace = "30s" | ||
min_heartbeat_ttl = "33s" | ||
max_heartbeats_per_second = 11.0 | ||
retry_join = ["1.1.1.1", "2.2.2.2"] | ||
start_join = ["1.1.1.1", "2.2.2.2"] | ||
retry_max = 3 | ||
retry_interval = "15s" | ||
rejoin_after_leave = true | ||
non_voting_server = true | ||
redundancy_zone = "foo" | ||
upgrade_version = "0.8.0" | ||
encrypt = "abc" | ||
|
||
server_join { | ||
retry_join = ["1.1.1.1", "2.2.2.2"] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,9 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error { | |
if err := c.csiPluginGC(eval); err != nil { | ||
return err | ||
} | ||
if err := c.csiVolumeClaimGC(eval); err != nil { | ||
return err | ||
} | ||
|
||
// Node GC must occur after the others to ensure the allocations are | ||
// cleared. | ||
|
@@ -714,32 +717,103 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, | |
return timeDiff > interval.Nanoseconds() | ||
} | ||
|
||
// TODO: we need a periodic trigger to iterate over all the volumes and split | ||
// them up into separate work items, same as we do for jobs. | ||
|
||
// csiVolumeClaimGC is used to garbage collect CSI volume claims | ||
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { | ||
|
||
gcClaims := func(ns, volID string) error { | ||
req := &structs.CSIVolumeClaimRequest{ | ||
VolumeID: volID, | ||
Claim: structs.CSIVolumeClaimRelease, | ||
} | ||
req.Namespace = ns | ||
req.Region = c.srv.config.Region | ||
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) | ||
return err | ||
} | ||
|
||
c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID) | ||
|
||
// Volume ID smuggled in with the eval's own JobID | ||
evalVolID := strings.Split(eval.JobID, ":") | ||
|
||
// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2 | ||
if len(evalVolID) < 2 { | ||
c.logger.Error("volume gc called without volID") | ||
return nil | ||
if len(evalVolID) > 1 { | ||
volID := evalVolID[1] | ||
return gcClaims(eval.Namespace, volID) | ||
} | ||
|
||
ws := memdb.NewWatchSet() | ||
|
||
iter, err := c.snap.CSIVolumes(ws) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Get the time table to calculate GC cutoffs. | ||
var oldThreshold uint64 | ||
if eval.JobID == structs.CoreJobForceGC { | ||
// The GC was forced, so set the threshold to its maximum so | ||
// everything will GC. | ||
oldThreshold = math.MaxUint64 | ||
c.logger.Debug("forced volume claim GC") | ||
} else { | ||
tt := c.srv.fsm.TimeTable() | ||
cutoff := time.Now().UTC().Add(-1 * c.srv.config.CSIVolumeClaimGCThreshold) | ||
oldThreshold = tt.NearestIndex(cutoff) | ||
} | ||
|
||
volID := evalVolID[1] | ||
req := &structs.CSIVolumeClaimRequest{ | ||
VolumeID: volID, | ||
Claim: structs.CSIVolumeClaimRelease, | ||
c.logger.Debug("CSI volume claim GC scanning before cutoff index", | ||
"index", oldThreshold, | ||
"csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold) | ||
|
||
NEXT_VOLUME: | ||
for i := iter.Next(); i != nil; i = iter.Next() { | ||
vol := i.(*structs.CSIVolume) | ||
|
||
// Ignore new volumes | ||
if vol.CreateIndex > oldThreshold { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the result of testing, or just a good idea? It seems like a pretty good idea either way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inspired by the way we do it for job GC. |
||
continue | ||
} | ||
|
||
// we only call the claim release RPC if the volume has claims | ||
// that no longer have valid allocations. otherwise we'd send | ||
// out a lot of do-nothing RPCs. | ||
for id := range vol.ReadClaims { | ||
alloc, err := c.snap.AllocByID(ws, id) | ||
if err != nil { | ||
return err | ||
} | ||
if alloc == nil { | ||
err = gcClaims(vol.Namespace, vol.ID) | ||
if err != nil { | ||
return err | ||
} | ||
goto NEXT_VOLUME | ||
} | ||
} | ||
for id := range vol.WriteClaims { | ||
alloc, err := c.snap.AllocByID(ws, id) | ||
if err != nil { | ||
return err | ||
} | ||
if alloc == nil { | ||
err = gcClaims(vol.Namespace, vol.ID) | ||
if err != nil { | ||
return err | ||
} | ||
goto NEXT_VOLUME | ||
} | ||
} | ||
if len(vol.PastClaims) > 0 { | ||
err = gcClaims(vol.Namespace, vol.ID) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
} | ||
req.Namespace = eval.Namespace | ||
req.Region = c.srv.config.Region | ||
return nil | ||
|
||
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) | ||
return err | ||
} | ||
|
||
// csiPluginGC is used to garbage collect unused plugins | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only new line. Thanks
gofmt