diff --git a/.changelog/16112.txt b/.changelog/16112.txt new file mode 100644 index 00000000000..c87e506483d --- /dev/null +++ b/.changelog/16112.txt @@ -0,0 +1,3 @@ +```release-note:bug +eval broker: Fixed a bug where the cancelable eval reaper used an incorrect lock when getting the set of cancelable evals from the broker +``` diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 0bfcdd9804b..ca7907a1faa 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -868,8 +868,8 @@ func (b *EvalBroker) Stats() *BrokerStats { // stale and ready to mark for canceling. The eval RPC will call this with a // batch size set to avoid sending overly large raft messages. func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { - b.l.RLock() - defer b.l.RUnlock() + b.l.Lock() + defer b.l.Unlock() if batchSize > len(b.cancelable) { batchSize = len(b.cancelable) diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 897eee379f9..2b93e498dd2 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -9,14 +9,16 @@ import ( "time" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/shoenig/test" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/shoenig/test" - "github.com/shoenig/test/must" - "github.com/stretchr/testify/require" ) var ( @@ -1475,6 +1477,31 @@ func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) { must.Eq(t, 100, eval.ModifyIndex) } +func TestEvalBroker_Cancelable(t *testing.T) { + ci.Parallel(t) + + b := testBroker(t, time.Minute) + + evals := []*structs.Evaluation{} + for i := 0; i < 20; i++ { + eval := mock.Eval() + evals = append(evals, eval) + } + b.cancelable = evals + b.stats.TotalCancelable = len(b.cancelable) + + must.Len(t, 20, b.cancelable) + cancelable := b.Cancelable(10) + must.Len(t, 10, cancelable) + must.Len(t, 10, b.cancelable) + must.Eq(t, 10, b.stats.TotalCancelable) + + cancelable = b.Cancelable(20) + must.Len(t, 10, cancelable) + must.Len(t, 0, b.cancelable) + must.Eq(t, 0, b.stats.TotalCancelable) +} + // TestEvalBroker_IntegrationTest exercises the eval broker with realistic // workflows func TestEvalBroker_IntegrationTest(t *testing.T) { @@ -1567,16 +1594,26 @@ func TestEvalBroker_IntegrationTest(t *testing.T) { config := DefaultConfig() config.NumSchedulers = 4 - config.EvalReapCancelableInterval = time.Minute * 10 - require.NoError(t, srv.Reload(config)) + must.NoError(t, srv.Reload(config)) // assert that all but 2 evals were canceled and that the eval broker state // has been cleared - require.Eventually(t, func() bool { - got := getEvalStatuses() - return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9 - }, 2*time.Second, time.Millisecond*100) + var got map[string]int + + must.Wait(t, wait.InitialSuccess( + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + wait.BoolFunc(func() bool { + got = getEvalStatuses() + return got[structs.EvalStatusComplete] == 2 && + got[structs.EvalStatusCancelled] == 9 + }), + ), + must.Func(func() string { + return fmt.Sprintf("expected map[complete:2 canceled:9] within timeout, got: %v with broker status=%#v", got, getStats()) + }), + ) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, TotalPending: 0, TotalCancelable: 0}, getStats()) diff --git a/nomad/leader.go b/nomad/leader.go index 08c658a3245..df501be435f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -1035,6 +1035,8 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} { return wakeCh } +const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10 + // cancelCancelableEvals pulls a batch of cancelable evaluations from the eval // broker and updates their status to canceled. func cancelCancelableEvals(srv *Server) error { @@ -1044,7 +1046,7 @@ func cancelCancelableEvals(srv *Server) error { // We *can* send larger raft logs but rough benchmarks show that a smaller // page size strikes a balance between throughput and time we block the FSM // apply for other operations - cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10) + cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize) if len(cancelable) > 0 { for i, eval := range cancelable { eval = eval.Copy()