Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

state: remove TimeTable and rely on objects' modify times instead #24112

Merged
merged 40 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5741bc7
fsm: adjust timeTableLimit according to longest GC threshold
pkazmierczak Oct 2, 2024
807efc5
simplify
pkazmierczak Oct 2, 2024
5e79452
remove timeTable from fsm
pkazmierczak Oct 14, 2024
acd05b0
remove timetable
pkazmierczak Oct 24, 2024
811cde5
remove tt completely
pkazmierczak Oct 24, 2024
9763110
adjust core sched for jobs, nodes and deployments
pkazmierczak Oct 24, 2024
e79d38f
add create and modify time to deployments
pkazmierczak Oct 24, 2024
065f03e
remove threshold index from other objects in the core scheduler
pkazmierczak Oct 24, 2024
fdead8e
i love that we mix unix and unixnano
pkazmierczak Oct 24, 2024
4583cb6
csi volumes create/modify time
pkazmierczak Oct 24, 2024
11e29a0
deployment create/modify times
pkazmierczak Oct 24, 2024
a97b72b
csi plugin create/modify time on upsert
pkazmierczak Oct 24, 2024
b3fc1cb
oh this is tedious
pkazmierczak Oct 28, 2024
a3b1538
i am miserable now
pkazmierczak Oct 28, 2024
e92767d
removed time.Now from fsm and state store methods
pkazmierczak Oct 30, 2024
d5c378c
signatures change
pkazmierczak Oct 30, 2024
0d25ee1
revert more tests
pkazmierczak Oct 30, 2024
2e722c6
state store and test fixes
pkazmierczak Oct 30, 2024
18166f4
fsm_test fixes
pkazmierczak Oct 30, 2024
3a0f746
fixed some csi tests
pkazmierczak Oct 30, 2024
5e13ba8
fixes
pkazmierczak Oct 30, 2024
53ecd98
TestAllocation_GCEligible
pkazmierczak Oct 30, 2024
46a2aca
wip on TestCoreScheduler_EvalGC, there's something 🐟-y going on here
pkazmierczak Oct 30, 2024
021b862
api update
pkazmierczak Oct 31, 2024
409b985
TestPlanApply_applyPlan
pkazmierczak Oct 31, 2024
6cbb9ee
TestPlanApply_applyPlanWithNormalizedAllocs
pkazmierczak Oct 31, 2024
d357903
TestSystemEndpoint_GarbageCollect
pkazmierczak Oct 31, 2024
589a8ae
first chunk of core_sched_test fixes
pkazmierczak Oct 31, 2024
5e8a31e
setting create/modify times in the mock causes issues
pkazmierczak Oct 31, 2024
d92418f
more core_sched tests fixed
pkazmierczak Oct 31, 2024
25769ab
reconciler test fixes
pkazmierczak Oct 31, 2024
6a2b464
oof, must.SliceContainsAll
pkazmierczak Oct 31, 2024
3e1feee
TestCoreScheduler_CSIPluginGC fix
pkazmierczak Oct 31, 2024
d88e6b9
pruneUnblockIndexes
pkazmierczak Oct 31, 2024
6f60a45
review comments
pkazmierczak Nov 1, 2024
c31a097
Tim's comment about API package UnixNano explanations
pkazmierczak Nov 1, 2024
a3ff3a2
cl
pkazmierczak Nov 1, 2024
3e43b56
review comment
pkazmierczak Nov 1, 2024
da2d741
TestCoreScheduler_EvalGC_Batch fix
pkazmierczak Nov 1, 2024
752d927
upgrade guide entry
pkazmierczak Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: Fixed setting GC threshold to more than 72hrs being ignored
```
15 changes: 15 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ type CSIVolume struct {
CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64

// ExtraKeysHCL is used by the hcl parser to report unexpected keys
ExtraKeysHCL []string `hcl1:",unusedKeys" json:"-"`
}
Expand Down Expand Up @@ -401,6 +406,11 @@ type CSIVolumeListStub struct {

CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64
}

type CSIVolumeListExternalResponse struct {
Expand Down Expand Up @@ -543,6 +553,11 @@ type CSIPlugin struct {
NodesExpected int
CreateIndex uint64
ModifyIndex uint64

// CreateTime stored as UnixNano
CreateTime int64
// ModifyTime stored as UnixNano
ModifyTime int64
}

type CSIPluginListStub struct {
Expand Down
7 changes: 7 additions & 0 deletions api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ type Deployment struct {

CreateIndex uint64
ModifyIndex uint64

// Creation and modification times, stored as UnixNano
CreateTime int64
ModifyTime int64
}

// DeploymentState tracks the state of a deployment for a given task group.
Expand Down Expand Up @@ -261,6 +265,9 @@ type DeploymentPromoteRequest struct {
// Groups is used to set the promotion status per task group
Groups []string

// PromotedAt is the timestamp stored as Unix nano
PromotedAt int64

WriteRequest
}

Expand Down
63 changes: 27 additions & 36 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ type BlockedEvals struct {
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[structs.NamespacedID]string

// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
// have been unblocked between the time they were in the scheduler and the
// time they are being blocked.
unblockIndexes map[string]uint64
// unblockIndexes maps computed node classes or quota name to the index and
// time at which they were unblocked. This is used to check if an
// evaluation could have been unblocked between the time they were in the
// scheduler and the time they are being blocked.
unblockIndexes map[string]unblockEvent

// duplicates is the set of evaluations for jobs that had pre-existing
// blocked evaluations. These should be marked as cancelled since only one
Expand All @@ -76,14 +76,16 @@ type BlockedEvals struct {
// duplicates.
duplicateCh chan struct{}

// timetable is used to correlate indexes with their insertion time. This
// allows us to prune based on time.
timetable *TimeTable

// stopCh is used to stop any created goroutines.
stopCh chan struct{}
}

// unblockEvent keeps a record of the index and time of the unblock
type unblockEvent struct {
index uint64
timestamp time.Time
}

// capacityUpdate stores unblock data.
type capacityUpdate struct {
computedClass string
Expand All @@ -107,7 +109,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger hclog.Logger) *BlockedEvals
escaped: make(map[string]wrappedEval),
system: newSystemEvals(),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
unblockIndexes: make(map[string]unblockEvent),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
Expand Down Expand Up @@ -143,12 +145,6 @@ func (b *BlockedEvals) SetEnabled(enabled bool) {
}
}

func (b *BlockedEvals) SetTimetable(timetable *TimeTable) {
b.l.Lock()
b.timetable = timetable
b.l.Unlock()
}

// Block tracks the passed evaluation and enqueues it into the eval broker when
// a suitable node calls unblock.
func (b *BlockedEvals) Block(eval *structs.Evaluation) {
Expand Down Expand Up @@ -303,10 +299,10 @@ func latestEvalIndex(eval *structs.Evaluation) uint64 {
// the lock held.
func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
var max uint64 = 0
for id, index := range b.unblockIndexes {
for id, u := range b.unblockIndexes {
// Calculate the max unblock index
if max < index {
max = index
if max < u.index {
max = u.index
}

// The evaluation is blocked because it has hit a quota limit not class
Expand All @@ -315,7 +311,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
if eval.QuotaLimitReached != id {
// Not a match
continue
} else if eval.SnapshotIndex < index {
} else if eval.SnapshotIndex < u.index {
// The evaluation was processed before the quota specification was
// updated, so unblock the evaluation.
return true
Expand All @@ -326,7 +322,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
}

elig, ok := eval.ClassEligibility[id]
if !ok && eval.SnapshotIndex < index {
if !ok && eval.SnapshotIndex < u.index {
// The evaluation was processed and did not encounter this class
// because it was added after it was processed. Thus for correctness
// we need to unblock it.
Expand All @@ -335,7 +331,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {

// The evaluation could use the computed node class and the eval was
// processed before the last unblock.
if elig && eval.SnapshotIndex < index {
if elig && eval.SnapshotIndex < u.index {
return true
}
}
Expand Down Expand Up @@ -415,7 +411,7 @@ func (b *BlockedEvals) Unblock(computedClass string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[computedClass] = index
b.unblockIndexes[computedClass] = unblockEvent{index, time.Now()}

// Capture chan in lock as Flush overwrites it
ch := b.capacityChangeCh
Expand Down Expand Up @@ -450,7 +446,7 @@ func (b *BlockedEvals) UnblockQuota(quota string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
b.unblockIndexes[quota] = index
b.unblockIndexes[quota] = unblockEvent{index, time.Now()}
ch := b.capacityChangeCh
done := b.stopCh
b.l.Unlock()
Expand Down Expand Up @@ -479,10 +475,11 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
// Store the index in which the unblock happened. We use this on subsequent
// block calls in case the evaluation was in the scheduler when a trigger
// occurred.
now := time.Now()
if quota != "" {
b.unblockIndexes[quota] = index
b.unblockIndexes[quota] = unblockEvent{index, now}
}
b.unblockIndexes[class] = index
b.unblockIndexes[class] = unblockEvent{index, now}

// Capture chan inside the lock to prevent a race with it getting reset
// in Flush.
Expand Down Expand Up @@ -699,8 +696,7 @@ func (b *BlockedEvals) Flush() {
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.unblockIndexes = make(map[string]unblockEvent)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
Expand Down Expand Up @@ -781,18 +777,13 @@ func (b *BlockedEvals) prune(stopCh <-chan struct{}) {
}

// pruneUnblockIndexes is used to prune any tracked entry that is excessively
// old. This protects againsts unbounded growth of the map.
// old. This protects against unbounded growth of the map.
func (b *BlockedEvals) pruneUnblockIndexes(cutoff time.Time) {
b.l.Lock()
defer b.l.Unlock()

if b.timetable == nil {
return
}

oldThreshold := b.timetable.NearestIndex(cutoff)
for key, index := range b.unblockIndexes {
if index < oldThreshold {
for key, u := range b.unblockIndexes {
if u.timestamp.Before(cutoff) {
delete(b.unblockIndexes, key)
}
}
Expand Down
Loading