diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 88987e3d9de..e97b4a30d36 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -79,14 +79,14 @@ func TestEventStream_QueryParse(t *testing.T) { desc: "all topics and keys specified", query: "?topic=*:*", want: map[stream.Topic][]string{ - "*": []string{"*"}, + "*": {"*"}, }, }, { desc: "all topics and keys inferred", query: "", want: map[stream.Topic][]string{ - "*": []string{"*"}, + "*": {"*"}, }, }, { @@ -103,14 +103,14 @@ func TestEventStream_QueryParse(t *testing.T) { desc: "single topic and key", query: "?topic=NodeDrain:*", want: map[stream.Topic][]string{ - "NodeDrain": []string{"*"}, + "NodeDrain": {"*"}, }, }, { desc: "single topic multiple keys", query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", want: map[stream.Topic][]string{ - "NodeDrain": []string{ + "NodeDrain": { "*", "3caace09-f1f4-4d23-b37a-9ab5eb75069d", }, @@ -120,10 +120,10 @@ func TestEventStream_QueryParse(t *testing.T) { desc: "multiple topics", query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", want: map[stream.Topic][]string{ - "NodeDrain": []string{ + "NodeDrain": { "3caace09-f1f4-4d23-b37a-9ab5eb75069d", }, - "NodeRegister": []string{ + "NodeRegister": { "*", }, }, diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 8373a900697..c4c503f35c2 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -1,6 +1,7 @@ package deploymentwatcher import ( + "context" "fmt" "testing" "time" @@ -917,7 +918,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -945,7 +946,7 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -1453,7 +1454,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth") } // Wait for there to be one eval @@ -1481,7 +1482,7 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval testutil.WaitForResult(func() (bool, error) { @@ -1562,7 +1563,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a1.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req), "UpsertDeploymentAllocHealth") req2 := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ @@ -1570,7 +1571,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a2.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + require.Nil(m.state.UpdateDeploymentAllocHealth(context.Background(), m.nextIndex(), req2), "UpsertDeploymentAllocHealth") // Wait for there to be one eval for each job testutil.WaitForResult(func() (bool, error) { diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go index 0542c446955..cc2e9d844fa 100644 --- a/nomad/deploymentwatcher/testutil_test.go +++ b/nomad/deploymentwatcher/testutil_test.go @@ -1,6 +1,7 @@ package deploymentwatcher import ( + "context" "reflect" "strings" "sync" @@ -95,7 +96,7 @@ func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) { func (m *mockBackend) UpdateDeploymentStatus(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { m.Called(u) i := m.nextIndex() - return i, m.state.UpdateDeploymentStatus(i, u) + return i, m.state.UpdateDeploymentStatus(context.Background(), i, u) } // matchDeploymentStatusUpdateConfig is used to configure the matching @@ -149,7 +150,7 @@ func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) fu func (m *mockBackend) UpdateDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { m.Called(req) i := m.nextIndex() - return i, m.state.UpdateDeploymentPromotion(i, req) + return i, m.state.UpdateDeploymentPromotion(context.Background(), i, req) } // matchDeploymentPromoteRequestConfig is used to configure the matching @@ -179,7 +180,7 @@ func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func( func (m *mockBackend) UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { m.Called(req) i := m.nextIndex() - return i, m.state.UpdateDeploymentAllocHealth(i, req) + return i, m.state.UpdateDeploymentAllocHealth(context.Background(), i, req) } // matchDeploymentAllocHealthRequestConfig is used to configure the matching diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 1e77faa7354..e494466559e 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "reflect" "strings" @@ -374,7 +375,7 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { EvalID: eval.ID, } assert := assert.New(t) - err := state.UpsertPlanResults(1000, &res) + err := state.UpsertPlanResults(context.Background(), 1000, &res) assert.Nil(err) // Dequeue the eval diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 5661200d43f..781adb5ad3d 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -117,19 +117,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) { } // Continue if there are no events - if events == nil { + if len(events.Events) == 0 { continue } - // Send each event as its own frame - for _, e := range events { - if err := jsonStream.Send(e); err != nil { - select { - case errCh <- err: - case <-ctx.Done(): - } - break LOOP + if err := jsonStream.Send(events); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): } + break LOOP } } }() diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 9e425619261..793c198a40a 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -91,7 +91,7 @@ OUTER: continue } - var event stream.Event + var event stream.Events err = json.Unmarshal(msg.Event.Data, &event) require.NoError(t, err) @@ -102,7 +102,7 @@ OUTER: Result: &out, } dec, err := mapstructure.NewDecoder(cfg) - dec.Decode(event.Payload) + dec.Decode(event.Events[0].Payload) require.NoError(t, err) require.Equal(t, node.ID, out.ID) break OUTER @@ -123,7 +123,7 @@ func TestEventStream_StreamErr(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) req := structs.EventStreamRequest{ - Topics: map[stream.Topic][]string{"*": []string{"*"}}, + Topics: map[stream.Topic][]string{"*": {"*"}}, QueryOptions: structs.QueryOptions{ Region: s1.Region(), }, @@ -210,7 +210,7 @@ func TestEventStream_RegionForward(t *testing.T) { // Create request targed for region foo req := structs.EventStreamRequest{ - Topics: map[stream.Topic][]string{"*": []string{"*"}}, + Topics: map[stream.Topic][]string{"*": {"*"}}, QueryOptions: structs.QueryOptions{ Region: "foo", }, @@ -272,7 +272,7 @@ OUTER: continue } - var event stream.Event + var event stream.Events err = json.Unmarshal(msg.Event.Data, &event) require.NoError(t, err) @@ -282,7 +282,7 @@ OUTER: Result: &out, } dec, err := mapstructure.NewDecoder(cfg) - dec.Decode(event.Payload) + dec.Decode(event.Events[0].Payload) require.NoError(t, err) require.Equal(t, node.ID, out.ID) break OUTER diff --git a/nomad/fsm.go b/nomad/fsm.go index d18a1d43b73..c4e4aa5bf85 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.NodeUpdateStatusRequestType: return n.applyStatusUpdate(buf[1:], log.Index) case structs.NodeUpdateDrainRequestType: - return n.applyDrainUpdate(buf[1:], log.Index) + return n.applyDrainUpdate(msgType, buf[1:], log.Index) case structs.JobRegisterRequestType: return n.applyUpsertJob(buf[1:], log.Index) case structs.JobDeregisterRequestType: @@ -226,13 +226,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.VaultAccessorDeregisterRequestType: return n.applyDeregisterVaultAccessor(buf[1:], log.Index) case structs.ApplyPlanResultsRequestType: - return n.applyPlanResults(buf[1:], log.Index) + return n.applyPlanResults(msgType, buf[1:], log.Index) case structs.DeploymentStatusUpdateRequestType: - return n.applyDeploymentStatusUpdate(buf[1:], log.Index) + return n.applyDeploymentStatusUpdate(msgType, buf[1:], log.Index) case structs.DeploymentPromoteRequestType: - return n.applyDeploymentPromotion(buf[1:], log.Index) + return n.applyDeploymentPromotion(msgType, buf[1:], log.Index) case structs.DeploymentAllocHealthRequestType: - return n.applyDeploymentAllocHealth(buf[1:], log.Index) + return n.applyDeploymentAllocHealth(msgType, buf[1:], log.Index) case structs.DeploymentDeleteRequestType: return n.applyDeploymentDelete(buf[1:], log.Index) case structs.JobStabilityRequestType: @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { case structs.AutopilotRequestType: return n.applyAutopilotUpdate(buf[1:], log.Index) case structs.UpsertNodeEventsType: - return n.applyUpsertNodeEvent(buf[1:], log.Index) + return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index) case structs.JobBatchDeregisterRequestType: return n.applyBatchDeregisterJob(buf[1:], log.Index) case structs.AllocUpdateDesiredTransitionRequestType: @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} { return nil } -func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now()) var req structs.NodeUpdateDrainRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType) + // COMPAT Remove in version 0.10 // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a // drain strategy but we need to handle the upgrade path where the Raft log @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} { } } - if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { + if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err } @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} } // applyUpsertNodeEvent tracks the given node events. -func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now()) var req structs.EmitNodeEventsRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err)) } - if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil { n.logger.Error("failed to add node events", "error", err) return err } @@ -953,14 +957,16 @@ func (n *nomadFSM) applyDeregisterSIAccessor(buf []byte, index uint64) interface } // applyPlanApply applies the results of a plan application -func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyPlanResults(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now()) var req structs.ApplyPlanResultsRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpsertPlanResults(index, &req); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpsertPlanResults(ctx, index, &req); err != nil { n.logger.Error("ApplyPlan failed", "error", err) return err } @@ -972,14 +978,16 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} { // applyDeploymentStatusUpdate is used to update the status of an existing // deployment -func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDeploymentStatusUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_status_update"}, time.Now()) var req structs.DeploymentStatusUpdateRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateDeploymentStatus(index, &req); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpdateDeploymentStatus(ctx, index, &req); err != nil { n.logger.Error("UpsertDeploymentStatusUpdate failed", "error", err) return err } @@ -989,14 +997,16 @@ func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interfa } // applyDeploymentPromotion is used to promote canaries in a deployment -func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDeploymentPromotion(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_promotion"}, time.Now()) var req structs.ApplyDeploymentPromoteRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateDeploymentPromotion(index, &req); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpdateDeploymentPromotion(ctx, index, &req); err != nil { n.logger.Error("UpsertDeploymentPromotion failed", "error", err) return err } @@ -1007,14 +1017,16 @@ func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{ // applyDeploymentAllocHealth is used to set the health of allocations as part // of a deployment -func (n *nomadFSM) applyDeploymentAllocHealth(buf []byte, index uint64) interface{} { +func (n *nomadFSM) applyDeploymentAllocHealth(msgType structs.MessageType, buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_alloc_health"}, time.Now()) var req structs.ApplyDeploymentAllocHealthRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.UpdateDeploymentAllocHealth(index, &req); err != nil { + ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType) + + if err := n.state.UpdateDeploymentAllocHealth(ctx, index, &req); err != nil { n.logger.Error("UpsertDeploymentAllocHealth failed", "error", err) return err } diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 4f527018b28..08d05002601 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -310,7 +310,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap // Optimistically apply to our state view if snap != nil { nextIdx := p.raft.AppliedIndex() + 1 - if err := snap.UpsertPlanResults(nextIdx, &req); err != nil { + if err := snap.UpsertPlanResults(context.Background(), nextIdx, &req); err != nil { return future, err } } diff --git a/nomad/state/apply_plan_events.go b/nomad/state/apply_plan_events.go new file mode 100644 index 00000000000..470c48790b9 --- /dev/null +++ b/nomad/state/apply_plan_events.go @@ -0,0 +1,74 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +func ApplyPlanResultEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "deployment": + after, ok := change.After.(*structs.Deployment) + if !ok { + return nil, fmt.Errorf("transaction change was not a Deployment") + } + + event := stream.Event{ + Topic: TopicDeployment, + Type: TypeDeploymentUpdate, + Index: changes.Index, + Key: after.ID, + Payload: &DeploymentEvent{ + Deployment: after, + }, + } + events = append(events, event) + case "evals": + after, ok := change.After.(*structs.Evaluation) + if !ok { + return nil, fmt.Errorf("transaction change was not an Evaluation") + } + + event := stream.Event{ + Topic: TopicEval, + Index: changes.Index, + Key: after.ID, + Payload: &EvalEvent{ + Eval: after, + }, + } + + events = append(events, event) + case "allocs": + after, ok := change.After.(*structs.Allocation) + if !ok { + return nil, fmt.Errorf("transaction change was not an Allocation") + } + before := change.Before + var msg string + if before == nil { + msg = TypeAllocCreated + } else { + msg = TypeAllocUpdated + } + + event := stream.Event{ + Topic: TopicAlloc, + Type: msg, + Index: changes.Index, + Key: after.ID, + Payload: &AllocEvent{ + Alloc: after, + }, + } + + events = append(events, event) + } + } + + return events, nil +} diff --git a/nomad/state/deployment_event_test.go b/nomad/state/deployment_event_test.go new file mode 100644 index 00000000000..d71ed8d2f33 --- /dev/null +++ b/nomad/state/deployment_event_test.go @@ -0,0 +1,189 @@ +package state + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestDeploymentEventFromChanges(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventPublisher() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + e := mock.Eval() + e.JobID = j.ID + + d := mock.Deployment() + d.JobID = j.ID + + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + setupTx.Txn.Commit() + + ctx := context.WithValue(context.Background(), CtxMsgType, structs.DeploymentStatusUpdateRequestType) + + req := &structs.DeploymentStatusUpdateRequest{ + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusPaused, + StatusDescription: structs.DeploymentStatusDescriptionPaused, + }, + Eval: e, + // Exlude Job and assert its added + } + + require.NoError(t, s.UpdateDeploymentStatus(ctx, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status) + require.Contains(t, got.FilterKeys, j.ID) + +} + +func TestDeploymentEventFromChanges_Promotion(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventPublisher() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + tg1 := j.TaskGroups[0] + tg2 := tg1.Copy() + tg2.Name = "foo" + j.TaskGroups = append(j.TaskGroups, tg2) + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + + d := mock.Deployment() + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + d.JobID = j.ID + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + "foo": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + } + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + // create set of allocs + c1 := mock.Alloc() + c1.JobID = j.ID + c1.DeploymentID = d.ID + d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) + c1.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + c2 := mock.Alloc() + c2.JobID = j.ID + c2.DeploymentID = d.ID + d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) + c2.TaskGroup = tg2.Name + c2.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + + require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) + + // commit setup transaction + setupTx.Txn.Commit() + + e := mock.Eval() + // Request to promote canaries + ctx := context.WithValue(context.Background(), CtxMsgType, structs.DeploymentPromoteRequestType) + req := &structs.ApplyDeploymentPromoteRequest{ + DeploymentPromoteRequest: structs.DeploymentPromoteRequest{ + DeploymentID: d.ID, + All: true, + }, + Eval: e, + } + + require.NoError(t, s.UpdateDeploymentPromotion(ctx, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) +} + +func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, timeout time.Duration) []stream.Event { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(timeout): + require.Fail(t, "timeout waiting for events") + } + }() + + maxAttempts := 10 + for { + got := EventsForIndex(t, s, index) + if len(got) >= minEvents { + return got + } + maxAttempts-- + if maxAttempts == 0 { + require.Fail(t, "reached max attempts waiting for desired event count") + } + time.Sleep(10 * time.Millisecond) + } +} + +func EventsForIndex(t *testing.T, s *StateStore, index uint64) []stream.Event { + pub, err := s.EventPublisher() + require.NoError(t, err) + + sub, err := pub.Subscribe(&stream.SubscribeRequest{ + Topics: map[stream.Topic][]string{ + "*": []string{"*"}, + }, + Index: index, + }) + defer sub.Unsubscribe() + + require.NoError(t, err) + + var events []stream.Event + for { + e, err := sub.NextNoBlock() + require.NoError(t, err) + if e == nil { + break + } + events = append(events, e...) + } + return events +} diff --git a/nomad/state/deployment_events.go b/nomad/state/deployment_events.go new file mode 100644 index 00000000000..5f9838f28e2 --- /dev/null +++ b/nomad/state/deployment_events.go @@ -0,0 +1,84 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +func DeploymentEventFromChanges(msgType structs.MessageType, tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + + var eventType string + switch msgType { + case structs.DeploymentStatusUpdateRequestType: + eventType = TypeDeploymentUpdate + case structs.DeploymentPromoteRequestType: + eventType = TypeDeploymentPromotion + case structs.DeploymentAllocHealthRequestType: + eventType = TypeDeploymentAllocHealth + } + + for _, change := range changes.Changes { + switch change.Table { + case "deployment": + after, ok := change.After.(*structs.Deployment) + if !ok { + return nil, fmt.Errorf("transaction change was not a Deployment") + } + + event := stream.Event{ + Topic: TopicDeployment, + Type: eventType, + Index: changes.Index, + Key: after.ID, + FilterKeys: []string{after.JobID}, + Payload: &DeploymentEvent{ + Deployment: after, + }, + } + + events = append(events, event) + case "jobs": + after, ok := change.After.(*structs.Job) + if !ok { + return nil, fmt.Errorf("transaction change was not a Job") + } + + event := stream.Event{ + Topic: TopicJob, + Type: eventType, + Index: changes.Index, + Key: after.ID, + Payload: &JobEvent{ + Job: after, + }, + } + + events = append(events, event) + case "allocs": + // TODO(drew) determine how to handle alloc updates during deployment + case "evals": + after, ok := change.After.(*structs.Evaluation) + if !ok { + return nil, fmt.Errorf("transaction change was not an Evaluation") + } + + event := stream.Event{ + Topic: TopicEval, + Type: eventType, + Index: changes.Index, + Key: after.ID, + FilterKeys: []string{after.DeploymentID, after.JobID}, + Payload: &EvalEvent{ + Eval: after, + }, + } + + events = append(events, event) + } + } + + return events, nil +} diff --git a/nomad/state/events.go b/nomad/state/events.go new file mode 100644 index 00000000000..824623409f6 --- /dev/null +++ b/nomad/state/events.go @@ -0,0 +1,68 @@ +package state + +import ( + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + TopicDeployment stream.Topic = "Deployment" + TopicEval stream.Topic = "Eval" + TopicAlloc stream.Topic = "Alloc" + TopicJob stream.Topic = "Job" + // TopicNodeRegistration stream.Topic = "NodeRegistration" + // TopicNodeDeregistration stream.Topic = "NodeDeregistration" + // TopicNodeDrain stream.Topic = "NodeDrain" + TopicNode stream.Topic = "Node" + + // TODO(drew) Node Events use TopicNode + Type + TypeNodeRegistration = "NodeRegistration" + TypeNodeDeregistration = "NodeDeregistration" + TypeNodeDrain = "NodeDrain" + TypeNodeEvent = "NodeEvent" + + TypeDeploymentUpdate = "DeploymentStatusUpdate" + TypeDeploymentPromotion = "DeploymentPromotion" + TypeDeploymentAllocHealth = "DeploymentAllocHealth" + + TypeAllocCreated = "AllocCreated" + TypeAllocUpdated = "AllocUpdated" +) + +type JobEvent struct { + Job *structs.Job +} + +type EvalEvent struct { + Eval *structs.Evaluation +} + +type AllocEvent struct { + Alloc *structs.Allocation +} + +type DeploymentEvent struct { + Deployment *structs.Deployment +} + +type NodeEvent struct { + Node *structs.Node +} + +// NNodeDrainEvent is the Payload for a NodeDrain event. It contains +// information related to the Node being drained as well as high level +// information about the current allocations on the Node +type NodeDrainEvent struct { + Node *structs.Node + JobAllocs map[string]*JobDrainDetails +} + +type NodeDrainAllocDetails struct { + ID string + Migrate *structs.MigrateStrategy +} + +type JobDrainDetails struct { + Type string + AllocDetails map[string]NodeDrainAllocDetails +} diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index b2889e3a307..a9e55aeb10b 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -7,20 +7,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - TopicNodeRegistration = "NodeRegistration" - TopicNodeDeregistration = "NodeDeregistration" -) - -type NodeRegistrationEvent struct { - Event *structs.NodeEvent - NodeStatus string -} - -type NodeDeregistrationEvent struct { - NodeID string -} - // NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set // of transaction changes. func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { @@ -34,12 +20,12 @@ func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, } event := stream.Event{ - Topic: TopicNodeRegistration, + Topic: TopicNode, + Type: TypeNodeRegistration, Index: changes.Index, Key: after.ID, - Payload: &NodeRegistrationEvent{ - Event: after.Events[len(after.Events)-1], - NodeStatus: after.Status, + Payload: &NodeEvent{ + Node: after, }, } events = append(events, event) @@ -61,11 +47,87 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } event := stream.Event{ - Topic: TopicNodeDeregistration, + Topic: TopicNode, + Type: TypeNodeDeregistration, Index: changes.Index, Key: before.ID, - Payload: &NodeDeregistrationEvent{ - NodeID: before.ID, + Payload: &NodeEvent{ + Node: before, + }, + } + events = append(events, event) + } + } + return events, nil +} + +// NodeEventFromChanges generates a NodeDeregistrationEvent from a set +// of transaction changes. +func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + event := stream.Event{ + Topic: TopicNode, + Type: TypeNodeEvent, + Index: changes.Index, + Key: after.ID, + Payload: &NodeEvent{ + Node: after, + }, + } + events = append(events, event) + } + } + return events, nil +} + +func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + after, ok := change.After.(*structs.Node) + if !ok { + return nil, fmt.Errorf("transaction change was not a Node") + } + + // retrieve allocations currently on node + allocs, err := allocsByNodeTxn(tx, nil, after.ID) + if err != nil { + return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err) + } + + // build job/alloc details for node drain + jobAllocs := make(map[string]*JobDrainDetails) + for _, a := range allocs { + if _, ok := jobAllocs[a.Job.Name]; !ok { + jobAllocs[a.Job.Name] = &JobDrainDetails{ + AllocDetails: make(map[string]NodeDrainAllocDetails), + Type: a.Job.Type, + } + } + + jobAllocs[a.Job.Name].AllocDetails[a.ID] = NodeDrainAllocDetails{ + Migrate: a.MigrateStrategy(), + ID: a.ID, + } + } + + event := stream.Event{ + Topic: TopicNode, + Type: TypeNodeDrain, + Index: changes.Index, + Key: after.ID, + Payload: &NodeDrainEvent{ + Node: after, + JobAllocs: jobAllocs, }, } events = append(events, event) diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index 10b9458a635..78918db2357 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -2,6 +2,7 @@ package state import ( "testing" + "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" @@ -9,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestNodeRegisterEventFromChanges(t *testing.T) { +func TestNodeEventsFromChanges(t *testing.T) { cases := []struct { Name string MsgType structs.MessageType @@ -17,53 +18,47 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { Mutate func(s *StateStore, tx *txn) error WantEvents []stream.Event WantErr bool - WantTopic string + WantTopic stream.Topic }{ { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNodeRegistration, + WantTopic: TopicNode, Name: "node registered", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) }, WantEvents: []stream.Event{{ - Topic: TopicNodeRegistration, + Topic: TopicNode, + Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, - Payload: &NodeRegistrationEvent{ - Event: &structs.NodeEvent{ - Message: "Node registered", - Subsystem: "Cluster", - }, - NodeStatus: structs.NodeStatusReady, + Payload: &NodeEvent{ + Node: testNode(), }, }}, WantErr: false, }, { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNodeRegistration, + WantTopic: TopicNode, Name: "node registered initializing", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) }, WantEvents: []stream.Event{{ - Topic: TopicNodeRegistration, + Topic: TopicNode, + Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, - Payload: &NodeRegistrationEvent{ - Event: &structs.NodeEvent{ - Message: "Node registered", - Subsystem: "Cluster", - }, - NodeStatus: structs.NodeStatusInit, + Payload: &NodeEvent{ + Node: testNode(nodeNotReady), }, }}, WantErr: false, }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNodeDeregistration, + WantTopic: TopicNode, Name: "node deregistered", Setup: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) @@ -72,18 +67,19 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) }, WantEvents: []stream.Event{{ - Topic: TopicNodeDeregistration, + Topic: TopicNode, + Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, - Payload: &NodeDeregistrationEvent{ - NodeID: testNodeID(), + Payload: &NodeEvent{ + Node: testNode(), }, }}, WantErr: false, }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNodeDeregistration, + WantTopic: TopicNode, Name: "batch node deregistered", Setup: func(s *StateStore, tx *txn) error { require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) @@ -94,19 +90,73 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { }, WantEvents: []stream.Event{ { - Topic: TopicNodeDeregistration, + Topic: TopicNode, + Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, - Payload: &NodeDeregistrationEvent{ - NodeID: testNodeID(), + Payload: &NodeEvent{ + Node: testNode(), }, }, { - Topic: TopicNodeDeregistration, + Topic: TopicNode, + Type: TypeNodeDeregistration, Key: testNodeIDTwo(), Index: 100, - Payload: &NodeDeregistrationEvent{ - NodeID: testNodeIDTwo(), + Payload: &NodeEvent{ + Node: testNode(nodeIDTwo), + }, + }, + }, + WantErr: false, + }, + { + MsgType: structs.UpsertNodeEventsType, + WantTopic: TopicNode, + Name: "batch node events upserted", + Setup: func(s *StateStore, tx *txn) error { + require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) + return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo)) + }, + Mutate: func(s *StateStore, tx *txn) error { + eventFn := func(id string) []*structs.NodeEvent { + return []*structs.NodeEvent{ + { + Message: "test event one", + Subsystem: "Cluster", + Details: map[string]string{ + "NodeID": id, + }, + }, + { + Message: "test event two", + Subsystem: "Cluster", + Details: map[string]string{ + "NodeID": id, + }, + }, + } + } + require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx)) + return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx) + }, + WantEvents: []stream.Event{ + { + Topic: TopicNode, + Type: TypeNodeEvent, + Key: testNodeID(), + Index: 100, + Payload: &NodeEvent{ + Node: testNode(), + }, + }, + { + Topic: TopicNode, + Type: TypeNodeEvent, + Key: testNodeIDTwo(), + Index: 100, + Payload: &NodeEvent{ + Node: testNode(nodeIDTwo), }, }, }, @@ -140,44 +190,104 @@ func TestNodeRegisterEventFromChanges(t *testing.T) { require.Equal(t, len(tc.WantEvents), len(got)) for idx, g := range got { + // assert equality of shared fields + + want := tc.WantEvents[idx] + require.Equal(t, want.Index, g.Index) + require.Equal(t, want.Key, g.Key) + require.Equal(t, want.Topic, g.Topic) + switch tc.MsgType { case structs.NodeRegisterRequestType: requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g) case structs.NodeDeregisterRequestType: requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g) + case structs.UpsertNodeEventsType: + requireNodeEventEqual(t, tc.WantEvents[idx], g) + default: + require.Fail(t, "unhandled message type") } } }) } } +func TestNodeDrainEventFromChanges(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventPublisher() + + // setup + setupTx := s.db.WriteTxn(10) + + node := mock.Node() + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + alloc1.NodeID = node.ID + alloc2.NodeID = node.ID + + require.NoError(t, upsertNodeTxn(setupTx, 10, node)) + require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx)) + setupTx.Txn.Commit() + + // changes + tx := s.db.WriteTxn(100) + + strat := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Minute, + IgnoreSystemJobs: false, + }, + StartedAt: time.Now(), + } + markEligible := false + updatedAt := time.Now() + event := &structs.NodeEvent{} + + require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event)) + changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType} + got, err := processDBChanges(tx, changes) + require.NoError(t, err) + + require.Len(t, got, 1) + + require.Equal(t, TopicNode, got[0].Topic) + require.Equal(t, TypeNodeDrain, got[0].Type) + require.Equal(t, uint64(100), got[0].Index) + + nodeEvent, ok := got[0].Payload.(*NodeDrainEvent) + require.True(t, ok) + + require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility) + require.Equal(t, strat, nodeEvent.Node.DrainStrategy) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { t.Helper() - require.Equal(t, want.Index, got.Index) - require.Equal(t, want.Key, got.Key) - require.Equal(t, want.Topic, got.Topic) - - wantPayload := want.Payload.(*NodeRegistrationEvent) - gotPayload := got.Payload.(*NodeRegistrationEvent) + wantPayload := want.Payload.(*NodeEvent) + gotPayload := got.Payload.(*NodeEvent) // Check payload equality for the fields that we can easily control - require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus) - require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message) - require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem) + require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status) + require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID) + require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events) } func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { t.Helper() - require.Equal(t, want.Index, got.Index) - require.Equal(t, want.Key, got.Key) - require.Equal(t, want.Topic, got.Topic) + wantPayload := want.Payload.(*NodeEvent) + gotPayload := got.Payload.(*NodeEvent) + + require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID) + require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events) +} - wantPayload := want.Payload.(*NodeDeregistrationEvent) - gotPayload := got.Payload.(*NodeDeregistrationEvent) +func requireNodeEventEqual(t *testing.T, want, got stream.Event) { + gotPayload := got.Payload.(*NodeEvent) - require.Equal(t, wantPayload, gotPayload) + require.Len(t, gotPayload.Node.Events, 3) } type nodeOpts func(n *structs.Node) @@ -186,6 +296,10 @@ func nodeNotReady(n *structs.Node) { n.Status = structs.NodeStatusInit } +func nodeReady(n *structs.Node) { + n.Status = structs.NodeStatusReady +} + func nodeIDTwo(n *structs.Node) { n.ID = testNodeIDTwo() } diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index be80c53c6bb..ad5e77e0459 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -190,6 +190,18 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { return NodeRegisterEventFromChanges(tx, changes) case structs.NodeDeregisterRequestType: return NodeDeregisterEventFromChanges(tx, changes) + case structs.NodeUpdateDrainRequestType: + return NodeDrainEventFromChanges(tx, changes) + case structs.UpsertNodeEventsType: + return NodeEventFromChanges(tx, changes) + case structs.DeploymentStatusUpdateRequestType: + return DeploymentEventFromChanges(changes.MsgType, tx, changes) + case structs.DeploymentPromoteRequestType: + return DeploymentEventFromChanges(changes.MsgType, tx, changes) + case structs.DeploymentAllocHealthRequestType: + return DeploymentEventFromChanges(changes.MsgType, tx, changes) + case structs.ApplyPlanResultsRequestType: + return ApplyPlanResultEventsFromChanges(tx, changes) } return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bc4cccb3b9c..a16766b34c9 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -270,7 +270,7 @@ RUN_QUERY: } // UpsertPlanResults is used to upsert the results of a plan. -func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error { +func (s *StateStore) UpsertPlanResults(ctx context.Context, index uint64, results *structs.ApplyPlanResultsRequest) error { snapshot, err := s.Snapshot() if err != nil { return err @@ -292,7 +292,7 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return err } - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnCtx(ctx, index) defer txn.Abort() // Upsert the newly created or updated deployment @@ -356,7 +356,10 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } - txn.Commit() + if err := txn.Commit(); err != nil { + return err + } + return nil } @@ -956,6 +959,19 @@ func (s *StateStore) BatchUpdateNodeDrain(index uint64, updatedAt int64, updates return nil } +// UpdateNodeDrain is used to update the drain of a node +func (s *StateStore) UpdateNodeDrainCtx(ctx context.Context, index uint64, nodeID string, + drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { + + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { + return err + } + txn.Commit() + return nil +} + // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { @@ -1059,6 +1075,20 @@ func (s *StateStore) UpdateNodeEligibility(index uint64, nodeID string, eligibil return nil } +func (s *StateStore) UpsertNodeEventsCtx(ctx context.Context, index uint64, nodeEvents map[string][]*structs.NodeEvent) error { + txn := s.db.WriteTxnCtx(ctx, index) + defer txn.Abort() + + for nodeID, events := range nodeEvents { + if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { + return err + } + } + + txn.Commit() + return nil +} + // UpsertNodeEvents adds the node events to the nodes, rotating events as // necessary. func (s *StateStore) UpsertNodeEvents(index uint64, nodeEvents map[string][]*structs.NodeEvent) error { @@ -3375,6 +3405,10 @@ func (s *StateStore) AllocsByIDPrefixInNSes(ws memdb.WatchSet, namespaces map[st func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { txn := s.db.ReadTxn() + return allocsByNodeTxn(txn, ws, node) +} + +func allocsByNodeTxn(txn ReadTxn, ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { // Get an iterator over the node allocations, using only the // node prefix which ignores the terminal status iter, err := txn.Get("allocs", "node_prefix", node) @@ -3796,8 +3830,8 @@ func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([ // UpdateDeploymentStatus is used to make deployment status updates and // potentially make a evaluation -func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.DeploymentStatusUpdateRequest) error { - txn := s.db.WriteTxn(index) +func (s *StateStore) UpdateDeploymentStatus(ctx context.Context, index uint64, req *structs.DeploymentStatusUpdateRequest) error { + txn := s.db.WriteTxnCtx(ctx, index) defer txn.Abort() if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil { @@ -3900,8 +3934,8 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin // UpdateDeploymentPromotion is used to promote canaries in a deployment and // potentially make a evaluation -func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error { - txn := s.db.WriteTxn(index) +func (s *StateStore) UpdateDeploymentPromotion(ctx context.Context, index uint64, req *structs.ApplyDeploymentPromoteRequest) error { + txn := s.db.WriteTxnCtx(ctx, index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active @@ -4043,8 +4077,8 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD // UpdateDeploymentAllocHealth is used to update the health of allocations as // part of the deployment and potentially make a evaluation -func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error { - txn := s.db.WriteTxn(index) +func (s *StateStore) UpdateDeploymentAllocHealth(ctx context.Context, index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error { + txn := s.db.WriteTxnCtx(ctx, index) defer txn.Abort() // Retrieve deployment and ensure it is not terminal and is active diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 6c3a16aff49..ba0e94b53df 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -127,7 +127,7 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing EvalID: eval.ID, } assert := assert.New(t) - err := state.UpsertPlanResults(1000, &res) + err := state.UpsertPlanResults(context.Background(), 1000, &res) assert.Nil(err) ws := memdb.NewWatchSet() @@ -203,7 +203,7 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { } assert := assert.New(t) planModifyIndex := uint64(1000) - err := state.UpsertPlanResults(planModifyIndex, &res) + err := state.UpsertPlanResults(context.Background(), planModifyIndex, &res) require.NoError(err) ws := memdb.NewWatchSet() @@ -284,7 +284,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { EvalID: eval.ID, } - err := state.UpsertPlanResults(1000, &res) + err := state.UpsertPlanResults(context.Background(), 1000, &res) if err != nil { t.Fatalf("err: %v", err) } @@ -332,7 +332,7 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { EvalID: eval.ID, } - err = state.UpsertPlanResults(1001, &res) + err = state.UpsertPlanResults(context.Background(), 1001, &res) if err != nil { t.Fatalf("err: %v", err) } @@ -400,7 +400,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { PreemptionEvals: []*structs.Evaluation{eval2}, } - err = state.UpsertPlanResults(1000, &res) + err = state.UpsertPlanResults(context.Background(), 1000, &res) require.NoError(err) ws := memdb.NewWatchSet() @@ -486,7 +486,7 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { EvalID: eval.ID, } - err := state.UpsertPlanResults(1000, &res) + err := state.UpsertPlanResults(context.Background(), 1000, &res) if err != nil { t.Fatalf("err: %v", err) } @@ -6600,7 +6600,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Nonexistent(t *testing.T) { Status: structs.DeploymentStatusRunning, }, } - err := state.UpdateDeploymentStatus(2, req) + err := state.UpdateDeploymentStatus(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "does not exist") { t.Fatalf("expected error updating the status because the deployment doesn't exist") } @@ -6627,7 +6627,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Terminal(t *testing.T) { Status: structs.DeploymentStatusRunning, }, } - err := state.UpdateDeploymentStatus(2, req) + err := state.UpdateDeploymentStatus(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "has terminal status") { t.Fatalf("expected error updating the status because the deployment is terminal") } @@ -6661,7 +6661,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_NonTerminal(t *testing.T) { Job: j, Eval: e, } - err := state.UpdateDeploymentStatus(2, req) + err := state.UpdateDeploymentStatus(context.Background(), 2, req) if err != nil { t.Fatalf("bad: %v", err) } @@ -6722,7 +6722,7 @@ func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) { StatusDescription: structs.DeploymentStatusDescriptionSuccessful, }, } - err := state.UpdateDeploymentStatus(3, req) + err := state.UpdateDeploymentStatus(context.Background(), 3, req) if err != nil { t.Fatalf("bad: %v", err) } @@ -6820,7 +6820,7 @@ func TestStateStore_UpsertDeploymentPromotion_Nonexistent(t *testing.T) { All: true, }, } - err := state.UpdateDeploymentPromotion(2, req) + err := state.UpdateDeploymentPromotion(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "does not exist") { t.Fatalf("expected error promoting because the deployment doesn't exist") } @@ -6847,7 +6847,7 @@ func TestStateStore_UpsertDeploymentPromotion_Terminal(t *testing.T) { All: true, }, } - err := state.UpdateDeploymentPromotion(2, req) + err := state.UpdateDeploymentPromotion(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "has terminal status") { t.Fatalf("expected error updating the status because the deployment is terminal: %v", err) } @@ -6897,7 +6897,7 @@ func TestStateStore_UpsertDeploymentPromotion_Unhealthy(t *testing.T) { All: true, }, } - err := state.UpdateDeploymentPromotion(4, req) + err := state.UpdateDeploymentPromotion(context.Background(), 4, req) require.NotNil(err) require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`) } @@ -6926,7 +6926,7 @@ func TestStateStore_UpsertDeploymentPromotion_NoCanaries(t *testing.T) { All: true, }, } - err := state.UpdateDeploymentPromotion(4, req) + err := state.UpdateDeploymentPromotion(context.Background(), 4, req) require.NotNil(err) require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`) } @@ -6997,7 +6997,7 @@ func TestStateStore_UpsertDeploymentPromotion_All(t *testing.T) { }, Eval: e, } - err := state.UpdateDeploymentPromotion(4, req) + err := state.UpdateDeploymentPromotion(context.Background(), 4, req) if err != nil { t.Fatalf("bad: %v", err) } @@ -7103,7 +7103,7 @@ func TestStateStore_UpsertDeploymentPromotion_Subset(t *testing.T) { }, Eval: e, } - require.Nil(state.UpdateDeploymentPromotion(4, req)) + require.Nil(state.UpdateDeploymentPromotion(context.Background(), 4, req)) // Check that the status per task group was updated properly ws := memdb.NewWatchSet() @@ -7146,7 +7146,7 @@ func TestStateStore_UpsertDeploymentAllocHealth_Nonexistent(t *testing.T) { HealthyAllocationIDs: []string{uuid.Generate()}, }, } - err := state.UpdateDeploymentAllocHealth(2, req) + err := state.UpdateDeploymentAllocHealth(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "does not exist") { t.Fatalf("expected error because the deployment doesn't exist: %v", err) } @@ -7173,7 +7173,7 @@ func TestStateStore_UpsertDeploymentAllocHealth_Terminal(t *testing.T) { HealthyAllocationIDs: []string{uuid.Generate()}, }, } - err := state.UpdateDeploymentAllocHealth(2, req) + err := state.UpdateDeploymentAllocHealth(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "has terminal status") { t.Fatalf("expected error because the deployment is terminal: %v", err) } @@ -7198,7 +7198,7 @@ func TestStateStore_UpsertDeploymentAllocHealth_BadAlloc_Nonexistent(t *testing. HealthyAllocationIDs: []string{uuid.Generate()}, }, } - err := state.UpdateDeploymentAllocHealth(2, req) + err := state.UpdateDeploymentAllocHealth(context.Background(), 2, req) if err == nil || !strings.Contains(err.Error(), "unknown alloc") { t.Fatalf("expected error because the alloc doesn't exist: %v", err) } @@ -7338,7 +7338,7 @@ func TestStateStore_UpsertDeploymentAllocHealth_BadAlloc_MismatchDeployment(t *t HealthyAllocationIDs: []string{a.ID}, }, } - err := state.UpdateDeploymentAllocHealth(4, req) + err := state.UpdateDeploymentAllocHealth(context.Background(), 4, req) if err == nil || !strings.Contains(err.Error(), "not part of deployment") { t.Fatalf("expected error because the alloc isn't part of the deployment: %v", err) } @@ -7395,7 +7395,7 @@ func TestStateStore_UpsertDeploymentAllocHealth(t *testing.T) { DeploymentUpdate: u, Timestamp: ts, } - err := state.UpdateDeploymentAllocHealth(3, req) + err := state.UpdateDeploymentAllocHealth(context.Background(), 3, req) if err != nil { t.Fatalf("bad: %v", err) } diff --git a/nomad/stream/event.go b/nomad/stream/event.go index 2625dede204..33679723e17 100644 --- a/nomad/stream/event.go +++ b/nomad/stream/event.go @@ -7,8 +7,15 @@ const ( type Topic string type Event struct { - Topic Topic - Key string - Index uint64 - Payload interface{} + Topic Topic + Type string + Key string + FilterKeys []string + Index uint64 + Payload interface{} +} + +type Events struct { + Index uint64 + Events []Event } diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index aedc4bc101b..0a6bb79cdfc 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -65,7 +65,7 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish e := &EventPublisher{ logger: cfg.Logger.Named("event_publisher"), eventBuf: buffer, - publishCh: make(chan changeEvents), + publishCh: make(chan changeEvents, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), }, diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index 5085a6fd970..691574a65a9 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -86,7 +86,7 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR for { es, err := sub.Next(ctx) eventCh <- subNextResult{ - Events: es, + Events: es.Events, Err: err, } if err != nil { diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index 0fc64512c19..bae3d091ac2 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -58,18 +58,38 @@ func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Sub } } -func (s *Subscription) Next(ctx context.Context) ([]Event, error) { +func (s *Subscription) Next(ctx context.Context) (Events, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return nil, ErrSubscriptionClosed + return Events{}, ErrSubscriptionClosed } for { next, err := s.currentItem.Next(ctx, s.forceClosed) switch { case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: - return nil, ErrSubscriptionClosed + return Events{}, ErrSubscriptionClosed case err != nil: - return nil, err + return Events{}, err + } + s.currentItem = next + + events := filter(s.req, next.Events) + if len(events) == 0 { + continue + } + return Events{Index: next.Index, Events: events}, nil + } +} + +func (s *Subscription) NextNoBlock() ([]Event, error) { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { + return nil, ErrSubscriptionClosed + } + + for { + next := s.currentItem.NextNoBlock() + if next == nil { + return nil, nil } s.currentItem = next diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d0daf6ca179..bdafc1c4cff 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8944,6 +8944,15 @@ func (a *Allocation) ReschedulePolicy() *ReschedulePolicy { return tg.ReschedulePolicy } +// MigrateStrategy returns the migrate strategy based on the task group +func (a *Allocation) MigrateStrategy() *MigrateStrategy { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil { + return nil + } + return tg.Migrate +} + // NextRescheduleTime returns a time on or after which the allocation is eligible to be rescheduled, // and whether the next reschedule time is within policy's interval if the policy doesn't allow unlimited reschedules func (a *Allocation) NextRescheduleTime() (time.Time, bool) { diff --git a/scheduler/testing.go b/scheduler/testing.go index c1d8776b4ad..9b43be42f0d 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "fmt" "sync" "time" @@ -170,7 +171,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er } // Apply the full plan - err := h.State.UpsertPlanResults(index, &req) + err := h.State.UpsertPlanResults(context.Background(), index, &req) return result, nil, err }