diff --git a/api/event.go b/api/event.go new file mode 100644 index 00000000000..e9009419e83 --- /dev/null +++ b/api/event.go @@ -0,0 +1,93 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" +) + +type Events struct { + Index uint64 + Events []Event +} + +type Topic string + +type Event struct { + Topic Topic + Type string + Key string + FilterKeys []string + Index uint64 + Payload interface{} +} + +func (e *Events) IsHeartBeat() bool { + return e.Index == 0 && len(e.Events) == 0 +} + +type EventStream struct { + client *Client +} + +func (c *Client) EventStream() *EventStream { + return &EventStream{client: c} +} + +func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) { + + errCh := make(chan error, 1) + + r, err := e.client.newRequest("GET", "/v1/event/stream") + if err != nil { + errCh <- err + return nil, errCh + } + r.setQueryOptions(q) + + // Build topic query params + for topic, keys := range topics { + for _, k := range keys { + r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k)) + } + } + + _, resp, err := requireOK(e.client.doRequest(r)) + + if err != nil { + errCh <- err + return nil, errCh + } + + eventsCh := make(chan *Events, 10) + go func() { + defer resp.Body.Close() + + dec := json.NewDecoder(resp.Body) + + for { + select { + case <-ctx.Done(): + close(eventsCh) + return + default: + } + + // Decode next newline delimited json of events + var events Events + if err := dec.Decode(&events); err != nil { + close(eventsCh) + errCh <- err + return + } + if events.IsHeartBeat() { + continue + } + + eventsCh <- &events + + } + }() + + return eventsCh, errCh +} diff --git a/api/event_test.go b/api/event_test.go new file mode 100644 index 00000000000..1ea1c6e4274 --- /dev/null +++ b/api/event_test.go @@ -0,0 +1,50 @@ +package api + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEvent_Stream(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // register job to generate events + jobs := c.Jobs() + job := testJob() + resp2, _, err := jobs.Register(job, nil) + require.Nil(t, err) + require.NotNil(t, resp2) + + // build event stream request + events := c.EventStream() + q := &QueryOptions{} + topics := map[Topic][]string{ + "Eval": {"*"}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamCh, errCh := events.Stream(ctx, topics, 0, q) + +OUTER: + for { + select { + case event := <-streamCh: + require.Equal(t, len(event.Events), 1) + require.Equal(t, "Eval", string(event.Events[0].Topic)) + + break OUTER + case err := <-errCh: + require.Fail(t, err.Error()) + case <-time.After(5 * time.Second): + require.Fail(t, "failed waiting for event stream event") + } + } +} diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index d9123b03f56..012e95dfaf4 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "reflect" "testing" "time" @@ -193,7 +194,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) { alloc2.ClientStatus = structs.AllocClientStatusRunning time.AfterFunc(100*time.Millisecond, func() { state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID)) - if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 4, []*structs.Allocation{alloc2}); err != nil { t.Fatalf("err: %v", err) } }) diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index 88b85620d44..ad7d222dc56 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "testing" "time" @@ -1300,7 +1301,7 @@ func TestDeploymentEndpoint_Allocations_Blocking(t *testing.T) { a2.ClientStatus = structs.AllocClientStatusRunning time.AfterFunc(100*time.Millisecond, func() { assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(a2.JobID)), "UpsertJobSummary") - assert.Nil(state.UpdateAllocsFromClient(6, []*structs.Allocation{a2}), "updateAllocsFromClient") + assert.Nil(state.UpdateAllocsFromClient(context.Background(), 6, []*structs.Allocation{a2}), "updateAllocsFromClient") }) req.MinQueryIndex = 4 diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index c4c503f35c2..d1285d76986 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -1025,7 +1025,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { Healthy: helper.BoolToPtr(false), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(context.Background(), 100, []*structs.Allocation{a2})) // Wait for the deployment to be failed testutil.WaitForResult(func() (bool, error) { @@ -1209,7 +1209,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { Healthy: helper.BoolToPtr(true), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to cross the deadline dout, err := m.state.DeploymentByID(nil, d.ID) @@ -1382,7 +1382,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { Healthy: helper.BoolToPtr(false), Timestamp: time.Now(), } - require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2})) + require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2})) // Wait for the alloc's DesiredState to set reschedule testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index ce480bd33ed..49896c7ce96 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -673,7 +673,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { new.ClientStatus = structs.AllocClientStatusComplete updates = append(updates, new) } - require.Nil(state.UpdateAllocsFromClient(1000, updates)) + require.Nil(state.UpdateAllocsFromClient(context.Background(), 1000, updates)) // Check that the node drain is removed testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/fsm.go b/nomad/fsm.go index c4e4aa5bf85..fd483434ac2 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -212,13 +212,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.JobDeregisterRequestType: return n.applyDeregisterJob(buf[1:], log.Index) case structs.EvalUpdateRequestType: - return n.applyUpdateEval(buf[1:], log.Index) + return n.applyUpdateEval(msgType, buf[1:], log.Index) case structs.EvalDeleteRequestType: return n.applyDeleteEval(buf[1:], log.Index) case structs.AllocUpdateRequestType: return n.applyAllocUpdate(buf[1:], log.Index) case structs.AllocClientUpdateRequestType: - return n.applyAllocClientUpdate(buf[1:], log.Index) + return n.applyAllocClientUpdate(msgType, buf[1:], log.Index) case structs.ReconcileJobSummariesRequestType: return n.applyReconcileSummaries(buf[1:], log.Index) case structs.VaultAccessorRegisterRequestType: @@ -570,7 +570,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // so this may be nil during server upgrades. if req.Eval != nil { req.Eval.JobModifyIndex = index - if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil { return err } } @@ -602,7 +602,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { // always attempt upsert eval even if job deregister fail if req.Eval != nil { req.Eval.JobModifyIndex = index - if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil { return err } } @@ -689,17 +689,20 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu return nil } -func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyUpdateEval(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now()) var req structs.EvalUpdateRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - return n.upsertEvals(index, req.Evals) + + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + return n.upsertEvals(ctx, index, req.Evals) } -func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error { - if err := n.state.UpsertEvals(index, evals); err != nil { +func (n *nomadFSM) upsertEvals(ctx context.Context, index uint64, evals []*structs.Evaluation) error { + if err := n.state.UpsertEvalsCtx(ctx, index, evals); err != nil { n.logger.Error("UpsertEvals failed", "error", err) return err } @@ -786,7 +789,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} { return nil } -func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now()) var req structs.AllocUpdateRequest if err := structs.Decode(buf, &req); err != nil { @@ -807,15 +810,16 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} } } + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) // Update all the client allocations - if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil { + if err := n.state.UpdateAllocsFromClient(ctx, index, req.Alloc); err != nil { n.logger.Error("UpdateAllocFromClient failed", "error", err) return err } // Update any evals if len(req.Evals) > 0 { - if err := n.upsertEvals(index, req.Evals); err != nil { + if err := n.upsertEvals(ctx, index, req.Evals); err != nil { n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err) return err } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 1be5728c598..db1b46a5c94 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "errors" "fmt" "net" @@ -2048,7 +2049,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { allocUpdate.ID = alloc.ID allocUpdate.ClientStatus = structs.AllocClientStatusRunning state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID)) - err := state.UpdateAllocsFromClient(200, []*structs.Allocation{allocUpdate}) + err := state.UpdateAllocsFromClient(context.Background(), 200, []*structs.Allocation{allocUpdate}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/state/events.go b/nomad/state/events.go index 824623409f6..921d7a8947e 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -1,6 +1,8 @@ package state import ( + "fmt" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -27,6 +29,8 @@ const ( TypeAllocCreated = "AllocCreated" TypeAllocUpdated = "AllocUpdated" + + TypeEvalUpdated = "EvalUpdated" ) type JobEvent struct { @@ -66,3 +70,56 @@ type JobDrainDetails struct { Type string AllocDetails map[string]NodeDrainAllocDetails } + +func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var eventType string + switch changes.MsgType { + case structs.EvalUpdateRequestType: + eventType = TypeEvalUpdated + case structs.AllocClientUpdateRequestType: + eventType = TypeAllocUpdated + } + + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "evals": + after, ok := change.After.(*structs.Evaluation) + if !ok { + return nil, fmt.Errorf("transaction change was not an Evaluation") + } + + event := stream.Event{ + Topic: TopicEval, + Type: eventType, + Index: changes.Index, + Key: after.ID, + Payload: &EvalEvent{ + Eval: after, + }, + } + + events = append(events, event) + + case "allocs": + after, ok := change.After.(*structs.Allocation) + if !ok { + return nil, fmt.Errorf("transaction change was not an Allocation") + } + + event := stream.Event{ + Topic: TopicAlloc, + Type: eventType, + Index: changes.Index, + Key: after.ID, + Payload: &AllocEvent{ + Alloc: after, + }, + } + + events = append(events, event) + } + } + + return events, nil +} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index ad5e77e0459..14b7b7cd130 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -202,6 +202,10 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { return DeploymentEventFromChanges(changes.MsgType, tx, changes) case structs.ApplyPlanResultsRequestType: return ApplyPlanResultEventsFromChanges(tx, changes) + case structs.EvalUpdateRequestType: + return GenericEventsFromChanges(tx, changes) + case structs.AllocClientUpdateRequestType: + return GenericEventsFromChanges(tx, changes) } return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a16766b34c9..31409ee1175 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2706,6 +2706,18 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro return err } +// UpsertEvals is used to upsert a set of evaluations +func (s *StateStore) UpsertEvalsCtx(ctx context.Context, index uint64, evals []*structs.Evaluation) error { + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + + err := s.UpsertEvalsTxn(index, evals, txn) + if err == nil { + txn.Commit() + } + return err +} + // UpsertEvals is used to upsert a set of evaluations, like UpsertEvals // but in a transaction. Useful for when making multiple modifications atomically func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error { @@ -3018,8 +3030,8 @@ func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memd // most things, some updates are authoritative from the client. Specifically, // the desired state comes from the schedulers, while the actual state comes // from clients. -func (s *StateStore) UpdateAllocsFromClient(index uint64, allocs []*structs.Allocation) error { - txn := s.db.WriteTxn(index) +func (s *StateStore) UpdateAllocsFromClient(ctx context.Context, index uint64, allocs []*structs.Allocation) error { + txn := s.db.WriteTxnCtx(ctx, index) defer txn.Abort() // Handle each of the updated allocations diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ba0e94b53df..c683606015c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4467,7 +4467,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) { JobID: alloc.JobID, TaskGroup: alloc.TaskGroup, } - err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update}) + err = state.UpdateAllocsFromClient(context.Background(), 1001, []*structs.Allocation{update}) if err != nil { t.Fatalf("err: %v", err) } @@ -4565,7 +4565,7 @@ func TestStateStore_UpdateAllocsFromClient_ChildJob(t *testing.T) { TaskGroup: alloc2.TaskGroup, } - err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2}) + err = state.UpdateAllocsFromClient(context.Background(), 1001, []*structs.Allocation{update, update2}) if err != nil { t.Fatalf("err: %v", err) } @@ -4666,7 +4666,7 @@ func TestStateStore_UpdateMultipleAllocsFromClient(t *testing.T) { TaskGroup: alloc.TaskGroup, } - err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2}) + err = state.UpdateAllocsFromClient(context.Background(), 1001, []*structs.Allocation{update, update2}) if err != nil { t.Fatalf("err: %v", err) } @@ -4735,7 +4735,7 @@ func TestStateStore_UpdateAllocsFromClient_Deployment(t *testing.T) { Timestamp: healthy, }, } - require.Nil(state.UpdateAllocsFromClient(1001, []*structs.Allocation{update})) + require.Nil(state.UpdateAllocsFromClient(context.Background(), 1001, []*structs.Allocation{update})) // Check that the deployment state was updated because the healthy // deployment @@ -4780,7 +4780,7 @@ func TestStateStore_UpdateAllocsFromClient_DeploymentStateMerges(t *testing.T) { Canary: false, }, } - require.Nil(state.UpdateAllocsFromClient(1001, []*structs.Allocation{update})) + require.Nil(state.UpdateAllocsFromClient(context.Background(), 1001, []*structs.Allocation{update})) // Check that the merging of the deployment status was correct out, err := state.AllocByID(nil, alloc.ID) @@ -5161,7 +5161,7 @@ func TestStateStore_UpdateAlloc_NoJob(t *testing.T) { // Update the client state of the allocation to complete allocCopy1 := allocCopy.Copy() allocCopy1.ClientStatus = structs.AllocClientStatusComplete - if err := state.UpdateAllocsFromClient(1003, []*structs.Allocation{allocCopy1}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 1003, []*structs.Allocation{allocCopy1}); err != nil { t.Fatalf("err: %v", err) } @@ -5272,12 +5272,12 @@ func TestStateStore_JobSummary(t *testing.T) { alloc1 := alloc.Copy() alloc1.ClientStatus = structs.AllocClientStatusPending alloc1.DesiredStatus = "" - state.UpdateAllocsFromClient(920, []*structs.Allocation{alloc}) + state.UpdateAllocsFromClient(context.Background(), 920, []*structs.Allocation{alloc}) alloc3 := alloc.Copy() alloc3.ClientStatus = structs.AllocClientStatusRunning alloc3.DesiredStatus = "" - state.UpdateAllocsFromClient(930, []*structs.Allocation{alloc3}) + state.UpdateAllocsFromClient(context.Background(), 930, []*structs.Allocation{alloc3}) // Upsert the alloc alloc4 := alloc.Copy() @@ -5320,7 +5320,7 @@ func TestStateStore_JobSummary(t *testing.T) { alloc6 := alloc.Copy() alloc6.ClientStatus = structs.AllocClientStatusRunning alloc6.DesiredStatus = "" - state.UpdateAllocsFromClient(990, []*structs.Allocation{alloc6}) + state.UpdateAllocsFromClient(context.Background(), 990, []*structs.Allocation{alloc6}) // We shouldn't have any summary at this point summary, _ = state.JobSummaryByID(ws, job.Namespace, job.ID) @@ -5347,7 +5347,7 @@ func TestStateStore_JobSummary(t *testing.T) { alloc7.Job = outJob alloc7.ClientStatus = structs.AllocClientStatusComplete alloc7.DesiredStatus = structs.AllocDesiredStatusRun - state.UpdateAllocsFromClient(1020, []*structs.Allocation{alloc7}) + state.UpdateAllocsFromClient(context.Background(), 1020, []*structs.Allocation{alloc7}) expectedSummary = structs.JobSummary{ JobID: job.ID, @@ -5392,7 +5392,7 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { // Change the state of the first alloc to running alloc3 := alloc.Copy() alloc3.ClientStatus = structs.AllocClientStatusRunning - state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc3}) + state.UpdateAllocsFromClient(context.Background(), 120, []*structs.Allocation{alloc3}) //Add some more allocs to the second tg alloc4 := mock.Alloc() @@ -5425,7 +5425,7 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { state.UpsertAllocs(130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10}) - state.UpdateAllocsFromClient(150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11}) + state.UpdateAllocsFromClient(context.Background(), 150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11}) // DeleteJobSummary is a helper method and doesn't modify the indexes table state.DeleteJobSummary(130, alloc.Namespace, alloc.Job.ID) @@ -5564,7 +5564,7 @@ func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) { alloc1.ClientStatus = structs.AllocClientStatusRunning // Updating allocation should not throw any error - if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 400, []*structs.Allocation{alloc1}); err != nil { t.Fatalf("expect err: %v", err) } @@ -5574,7 +5574,7 @@ func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) { // Update the alloc again alloc2 := alloc.Copy() alloc2.ClientStatus = structs.AllocClientStatusComplete - if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 400, []*structs.Allocation{alloc1}); err != nil { t.Fatalf("expect err: %v", err) } @@ -6484,7 +6484,7 @@ func TestStateJobSummary_UpdateJobCount(t *testing.T) { alloc5.JobID = alloc3.JobID alloc5.ClientStatus = structs.AllocClientStatusComplete - if err := state.UpdateAllocsFromClient(1004, []*structs.Allocation{alloc4, alloc5}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 1004, []*structs.Allocation{alloc4, alloc5}); err != nil { t.Fatalf("err: %v", err) } @@ -6561,7 +6561,7 @@ func TestJobSummary_UpdateClientStatus(t *testing.T) { alloc6.JobID = alloc.JobID alloc6.ClientStatus = structs.AllocClientStatusRunning - if err := state.UpdateAllocsFromClient(1002, []*structs.Allocation{alloc4, alloc5, alloc6}); err != nil { + if err := state.UpdateAllocsFromClient(context.Background(), 1002, []*structs.Allocation{alloc4, alloc5, alloc6}); err != nil { t.Fatalf("err: %v", err) } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7861cc82b30..61a16326c68 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "fmt" "reflect" "sort" @@ -2957,7 +2958,7 @@ func TestServiceSched_NodeUpdate(t *testing.T) { for i := 0; i < 4; i++ { out, _ := h.State.AllocByID(ws, allocs[i].ID) out.ClientStatus = structs.AllocClientStatusRunning - require.NoError(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{out})) + require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), []*structs.Allocation{out})) } // Create a mock evaluation which won't trigger any new placements @@ -3107,7 +3108,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { newAlloc.ClientStatus = structs.AllocClientStatusRunning running = append(running, newAlloc) } - require.NoError(t, h.State.UpdateAllocsFromClient(h.NextIndex(), running)) + require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), running)) // Mark some of the allocations as complete var complete []*structs.Allocation @@ -3126,7 +3127,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { newAlloc.ClientStatus = structs.AllocClientStatusComplete complete = append(complete, newAlloc) } - require.NoError(t, h.State.UpdateAllocsFromClient(h.NextIndex(), complete)) + require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), complete)) // Create a mock evaluation to deal with the node update eval := &structs.Evaluation{ diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index cb964989f15..3ba722e1385 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "fmt" "reflect" "sort" @@ -132,7 +133,7 @@ func TestSystemSched_JobRegister_StickyAllocs(t *testing.T) { // Get an allocation and mark it as failed alloc := planned[4].Copy() alloc.ClientStatus = structs.AllocClientStatusFailed - require.NoError(t, h.State.UpdateAllocsFromClient(h.NextIndex(), []*structs.Allocation{alloc})) + require.NoError(t, h.State.UpdateAllocsFromClient(context.Background(), h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to handle the update eval = &structs.Evaluation{