Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node Drain and Node Events events #8980

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
return n.applyDrainUpdate(buf[1:], log.Index)
return n.applyDrainUpdate(msgType, buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
Expand Down Expand Up @@ -250,7 +250,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.AutopilotRequestType:
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
return n.applyUpsertNodeEvent(msgType, buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
case structs.AllocUpdateDesiredTransitionRequestType:
Expand Down Expand Up @@ -402,13 +402,15 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// COMPAT Remove in version 0.10
// As part of Nomad 0.8 we have deprecated the drain boolean in favor of a
// drain strategy but we need to handle the upgrade path where the Raft log
Expand All @@ -423,7 +425,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
if err := n.state.UpdateNodeDrainCtx(ctx, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil {
n.logger.Error("UpdateNodeDrain failed", "error", err)
return err
}
Expand Down Expand Up @@ -874,14 +876,16 @@ func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{}
}

// applyUpsertNodeEvent tracks the given node events.
func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNodeEvent(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}

if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

if err := n.state.UpsertNodeEventsCtx(ctx, index, req.NodeEvents); err != nil {
n.logger.Error("failed to add node events", "error", err)
return err
}
Expand Down
101 changes: 99 additions & 2 deletions nomad/state/node_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
)

const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
TopicNodeRegistration stream.Topic = "NodeRegistration"
TopicNodeDeregistration stream.Topic = "NodeDeregistration"
TopicNodeDrain stream.Topic = "NodeDrain"
TopicNodeEvent stream.Topic = "NodeEvent"
)

type NodeRegistrationEvent struct {
Expand All @@ -21,6 +23,28 @@ type NodeDeregistrationEvent struct {
NodeID string
}

type NodeEvent struct {
Node *structs.Node
}

// NNodeDrainEvent is the Payload for a NodeDrain event. It contains
// information related to the Node being drained as well as high level
// information about the current allocations on the Node
type NodeDrainEvent struct {
Node *structs.Node
JobAllocs map[string]*JobDrainDetails
}

type NodeDrainAllocDetails struct {
ID string
Migrate *structs.MigrateStrategy
}

type JobDrainDetails struct {
Type string
AllocDetails map[string]NodeDrainAllocDetails
}

// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
Expand Down Expand Up @@ -73,3 +97,76 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
}
return events, nil
}

// NodeEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeEvent,
Index: changes.Index,
Key: after.ID,
Payload: &NodeEvent{
Node: after,
},
}
events = append(events, event)
}
}
return events, nil
}

func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

// retrieve allocations currently on node
allocs, err := allocsByNodeTxn(tx, nil, after.ID)
if err != nil {
return nil, fmt.Errorf("retrieving allocations for node drain event: %w", err)
}

// build job/alloc details for node drain
jobAllocs := make(map[string]*JobDrainDetails)
for _, a := range allocs {
if _, ok := jobAllocs[a.Job.Name]; !ok {
jobAllocs[a.Job.Name] = &JobDrainDetails{
AllocDetails: make(map[string]NodeDrainAllocDetails),
Type: a.Job.Type,
}
}

jobAllocs[a.Job.Name].AllocDetails[a.ID] = NodeDrainAllocDetails{
Migrate: a.MigrateStrategy(),
ID: a.ID,
}
}

event := stream.Event{
Topic: TopicNodeDrain,
Index: changes.Index,
Key: after.ID,
Payload: &NodeDrainEvent{
Node: after,
JobAllocs: jobAllocs,
},
}
events = append(events, event)
}
}
return events, nil
}
129 changes: 119 additions & 10 deletions nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package state

import (
"testing"
"time"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestNodeRegisterEventFromChanges(t *testing.T) {
func TestNodeEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
WantTopic stream.Topic
}{
{
MsgType: structs.NodeRegisterRequestType,
Expand Down Expand Up @@ -112,6 +113,56 @@ func TestNodeRegisterEventFromChanges(t *testing.T) {
},
WantErr: false,
},
{
MsgType: structs.UpsertNodeEventsType,
WantTopic: TopicNodeEvent,
Name: "batch node events upserted",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
eventFn := func(id string) []*structs.NodeEvent {
return []*structs.NodeEvent{
{
Message: "test event one",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
{
Message: "test event two",
Subsystem: "Cluster",
Details: map[string]string{
"NodeID": id,
},
},
}
}
require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx))
return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx)
},
WantEvents: []stream.Event{
{
Topic: TopicNodeEvent,
Key: testNodeID(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(),
},
},
{
Topic: TopicNodeEvent,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeEvent{
Node: testNode(nodeIDTwo),
},
},
},
WantErr: false,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -140,24 +191,80 @@ func TestNodeRegisterEventFromChanges(t *testing.T) {

require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
// assert equality of shared fields

want := tc.WantEvents[idx]
require.Equal(t, want.Index, g.Index)
require.Equal(t, want.Key, g.Key)
require.Equal(t, want.Topic, g.Topic)

switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.UpsertNodeEventsType:
requireNodeEventEqual(t, tc.WantEvents[idx], g)
default:
require.Fail(t, "unhandled message type")
}
}
})
}
}

func TestNodeDrainEventFromChanges(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

// setup
setupTx := s.db.WriteTxn(10)

node := mock.Node()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc1.NodeID = node.ID
alloc2.NodeID = node.ID

require.NoError(t, upsertNodeTxn(setupTx, 10, node))
require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx))
setupTx.Txn.Commit()

// changes
tx := s.db.WriteTxn(100)

strat := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
IgnoreSystemJobs: false,
},
StartedAt: time.Now(),
}
markEligible := false
updatedAt := time.Now()
event := &structs.NodeEvent{}

require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event))
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType}
got, err := processDBChanges(tx, changes)
require.NoError(t, err)

require.Len(t, got, 1)

require.Equal(t, TopicNodeDrain, got[0].Topic)
require.Equal(t, uint64(100), got[0].Index)

nodeEvent, ok := got[0].Payload.(*NodeDrainEvent)
require.True(t, ok)

require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)

Expand All @@ -170,16 +277,18 @@ func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)

require.Equal(t, wantPayload, gotPayload)
}

func requireNodeEventEqual(t *testing.T, want, got stream.Event) {
gotPayload := got.Payload.(*NodeEvent)

require.Len(t, gotPayload.Node.Events, 3)
}

type nodeOpts func(n *structs.Node)

func nodeNotReady(n *structs.Node) {
Expand Down
4 changes: 4 additions & 0 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
return NodeRegisterEventFromChanges(tx, changes)
case structs.NodeDeregisterRequestType:
return NodeDeregisterEventFromChanges(tx, changes)
case structs.NodeUpdateDrainRequestType:
return NodeDrainEventFromChanges(tx, changes)
case structs.UpsertNodeEventsType:
return NodeEventFromChanges(tx, changes)
}
return []stream.Event{}, nil
}
Loading