Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events/event source node #8918

Merged
merged 2 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -195,9 +196,9 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {

switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
return n.applyUpsertNode(msgType, buf[1:], log.Index)
case structs.NodeDeregisterRequestType:
return n.applyDeregisterNode(buf[1:], log.Index)
return n.applyDeregisterNode(msgType, buf[1:], log.Index)
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
Expand Down Expand Up @@ -310,17 +311,19 @@ func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell there's a 1:1 mapping between message types and n.apply* functions. So we're creating a context and threading the message type from raft -> fsm -> state store. There's only ever 1 parameter and it's the message type, which the apply function already needs to know. What do we get by creating a context rather than tacking on the msgType as a new parameter lower down the stack?


// Handle upgrade paths
req.Node.Canonicalize()

if err := n.state.UpsertNode(index, req.Node); err != nil {
if err := n.state.UpsertNodeCtx(ctx, index, req.Node); err != nil {
n.logger.Error("UpsertNode failed", "error", err)
return err
}
Expand All @@ -334,14 +337,16 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

if err := n.state.DeleteNodeCtx(ctx, index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
Expand Down
75 changes: 75 additions & 0 deletions nomad/state/node_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package state

import (
"fmt"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
)

type NodeRegistrationEvent struct {
Event *structs.NodeEvent
NodeStatus string
}

type NodeDeregistrationEvent struct {
NodeID string
}

// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeRegistration,
Index: changes.Index,
Key: after.ID,
Payload: &NodeRegistrationEvent{
Event: after.Events[len(after.Events)-1],
NodeStatus: after.Status,
},
}
events = append(events, event)
}
}
return events, nil
}

// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeDeregistration,
Index: changes.Index,
Key: before.ID,
Payload: &NodeDeregistrationEvent{
NodeID: before.ID,
},
}
events = append(events, event)
}
}
return events, nil
}
211 changes: 211 additions & 0 deletions nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package state

import (
"testing"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestNodeRegisterEventFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusReady,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusInit,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []stream.Event{{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "batch node deregistered",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
},
WantEvents: []stream.Event{
{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
},
{
Topic: TopicNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeIDTwo(),
},
},
},
WantErr: false,
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}

tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))

changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got, err := processDBChanges(tx, changes)

if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)

require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
}
}
})
}
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)

// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus)
require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message)
require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem)
}

func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)

require.Equal(t, wantPayload, gotPayload)
}

type nodeOpts func(n *structs.Node)

func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}

func nodeIDTwo(n *structs.Node) {
n.ID = testNodeIDTwo()
}

func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()

n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"

for _, opt := range opts {
opt(n)
}
return n
}

func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}

func testNodeIDTwo() string {
return "694ff31d-8c59-4030-ac83-e15692560c8d"
}
Loading