Skip to content

Commit

Permalink
eval broker: use write lock when reaping cancelable evals (#16112)
Browse files Browse the repository at this point in the history
The eval broker's `Cancelable` method used by the cancelable eval reaper mutates
the slice of cancelable evals by removing a batch at a time from the slice. But
this method unsafely uses a read lock despite this mutation. Under normal
workloads this is likely to be safe but when the eval broker is under the heavy
load this feature is intended to fix, we're likely to have a race
condition. Switch this to a write lock, like the other locks that mutate the
eval broker state.

This changeset also adjusts the timeout to allow poorly-sized Actions runners
more time to schedule the appropriate goroutines. The test has also been updated
to use `shoenig/test/wait` so we can have sensible reporting of the results
rather than just a timeout error when things go wrong.
  • Loading branch information
tgross authored Feb 10, 2023
1 parent ce614bf commit 1eabc36
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .changelog/16112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
eval broker: Fixed a bug where the cancelable eval reaper used an incorrect lock when getting the set of cancelable evals from the broker
```
4 changes: 2 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,8 @@ func (b *EvalBroker) Stats() *BrokerStats {
// stale and ready to mark for canceling. The eval RPC will call this with a
// batch size set to avoid sending overly large raft messages.
func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation {
b.l.RLock()
defer b.l.RUnlock()
b.l.Lock()
defer b.l.Unlock()

if batchSize > len(b.cancelable) {
batchSize = len(b.cancelable)
Expand Down
55 changes: 46 additions & 9 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -1475,6 +1477,31 @@ func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) {
must.Eq(t, 100, eval.ModifyIndex)
}

func TestEvalBroker_Cancelable(t *testing.T) {
ci.Parallel(t)

b := testBroker(t, time.Minute)

evals := []*structs.Evaluation{}
for i := 0; i < 20; i++ {
eval := mock.Eval()
evals = append(evals, eval)
}
b.cancelable = evals
b.stats.TotalCancelable = len(b.cancelable)

must.Len(t, 20, b.cancelable)
cancelable := b.Cancelable(10)
must.Len(t, 10, cancelable)
must.Len(t, 10, b.cancelable)
must.Eq(t, 10, b.stats.TotalCancelable)

cancelable = b.Cancelable(20)
must.Len(t, 10, cancelable)
must.Len(t, 0, b.cancelable)
must.Eq(t, 0, b.stats.TotalCancelable)
}

// TestEvalBroker_IntegrationTest exercises the eval broker with realistic
// workflows
func TestEvalBroker_IntegrationTest(t *testing.T) {
Expand Down Expand Up @@ -1567,16 +1594,26 @@ func TestEvalBroker_IntegrationTest(t *testing.T) {

config := DefaultConfig()
config.NumSchedulers = 4
config.EvalReapCancelableInterval = time.Minute * 10
require.NoError(t, srv.Reload(config))
must.NoError(t, srv.Reload(config))

// assert that all but 2 evals were canceled and that the eval broker state
// has been cleared

require.Eventually(t, func() bool {
got := getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 && got[structs.EvalStatusCancelled] == 9
}, 2*time.Second, time.Millisecond*100)
var got map[string]int

must.Wait(t, wait.InitialSuccess(
wait.Timeout(5*time.Second),
wait.Gap(100*time.Millisecond),
wait.BoolFunc(func() bool {
got = getEvalStatuses()
return got[structs.EvalStatusComplete] == 2 &&
got[structs.EvalStatusCancelled] == 9
}),
),
must.Func(func() string {
return fmt.Sprintf("expected map[complete:2 canceled:9] within timeout, got: %v with broker status=%#v", got, getStats())
}),
)

must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0,
TotalPending: 0, TotalCancelable: 0}, getStats())
Expand Down
4 changes: 3 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,8 @@ func (s *Server) reapCancelableEvaluations(stopCh chan struct{}) chan struct{} {
return wakeCh
}

const cancelableEvalsBatchSize = 728 // structs.MaxUUIDsPerWriteRequest / 10

// cancelCancelableEvals pulls a batch of cancelable evaluations from the eval
// broker and updates their status to canceled.
func cancelCancelableEvals(srv *Server) error {
Expand All @@ -1044,7 +1046,7 @@ func cancelCancelableEvals(srv *Server) error {
// We *can* send larger raft logs but rough benchmarks show that a smaller
// page size strikes a balance between throughput and time we block the FSM
// apply for other operations
cancelable := srv.evalBroker.Cancelable(structs.MaxUUIDsPerWriteRequest / 10)
cancelable := srv.evalBroker.Cancelable(cancelableEvalsBatchSize)
if len(cancelable) > 0 {
for i, eval := range cancelable {
eval = eval.Copy()
Expand Down

0 comments on commit 1eabc36

Please sign in to comment.