Skip to content

Commit

Permalink
Events/event source node (#8918)
Browse files Browse the repository at this point in the history
* Node Register/Deregister event sourcing

example upsert node with context

fill in writetxnwithctx

ctx passing to handle event type creation, wip test

node deregistration event

drop Node from registration event

* node batch deregistration
  • Loading branch information
drewbailey committed Oct 13, 2020
1 parent a0fe693 commit 4d491fe
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 12 deletions.
17 changes: 11 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -195,9 +196,9 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {

switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
return n.applyUpsertNode(msgType, buf[1:], log.Index)
case structs.NodeDeregisterRequestType:
return n.applyDeregisterNode(buf[1:], log.Index)
return n.applyDeregisterNode(msgType, buf[1:], log.Index)
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
Expand Down Expand Up @@ -310,17 +311,19 @@ func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// Handle upgrade paths
req.Node.Canonicalize()

if err := n.state.UpsertNode(index, req.Node); err != nil {
if err := n.state.UpsertNodeCtx(ctx, index, req.Node); err != nil {
n.logger.Error("UpsertNode failed", "error", err)
return err
}
Expand All @@ -334,14 +337,16 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

if err := n.state.DeleteNodeCtx(ctx, index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
Expand Down
75 changes: 75 additions & 0 deletions nomad/state/node_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package state

import (
"fmt"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
)

type NodeRegistrationEvent struct {
Event *structs.NodeEvent
NodeStatus string
}

type NodeDeregistrationEvent struct {
NodeID string
}

// NodeRegisterEventFromChanges generates a NodeRegistrationEvent from a set
// of transaction changes.
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after, ok := change.After.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeRegistration,
Index: changes.Index,
Key: after.ID,
Payload: &NodeRegistrationEvent{
Event: after.Events[len(after.Events)-1],
NodeStatus: after.Status,
},
}
events = append(events, event)
}
}
return events, nil
}

// NodeDeregisterEventFromChanges generates a NodeDeregistrationEvent from a set
// of transaction changes.
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before, ok := change.Before.(*structs.Node)
if !ok {
return nil, fmt.Errorf("transaction change was not a Node")
}

event := stream.Event{
Topic: TopicNodeDeregistration,
Index: changes.Index,
Key: before.ID,
Payload: &NodeDeregistrationEvent{
NodeID: before.ID,
},
}
events = append(events, event)
}
}
return events, nil
}
211 changes: 211 additions & 0 deletions nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package state

import (
"testing"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestNodeRegisterEventFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusReady,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusInit,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []stream.Event{{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "batch node deregistered",
Setup: func(s *StateStore, tx *txn) error {
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
},
WantEvents: []stream.Event{
{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
},
{
Topic: TopicNodeDeregistration,
Key: testNodeIDTwo(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeIDTwo(),
},
},
},
WantErr: false,
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}

tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))

changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got, err := processDBChanges(tx, changes)

if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)

require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
}
}
})
}
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)

// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus)
require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message)
require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem)
}

func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)

require.Equal(t, wantPayload, gotPayload)
}

type nodeOpts func(n *structs.Node)

func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}

func nodeIDTwo(n *structs.Node) {
n.ID = testNodeIDTwo()
}

func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()

n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"

for _, opt := range opts {
opt(n)
}
return n
}

func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}

func testNodeIDTwo() string {
return "694ff31d-8c59-4030-ac83-e15692560c8d"
}
Loading

0 comments on commit 4d491fe

Please sign in to comment.