Skip to content

Commit

Permalink
Node Register/Deregister event sourcing
Browse files Browse the repository at this point in the history
example upsert node with context

fill in writetxnwithctx

ctx passing to handle event type creation, wip test

node deregistration event

drop Node from registration event
  • Loading branch information
drewbailey committed Sep 22, 2020
1 parent 2eadc8d commit 24d8ef2
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 10 deletions.
17 changes: 11 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"fmt"
"io"
"reflect"
Expand Down Expand Up @@ -195,9 +196,9 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {

switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
return n.applyUpsertNode(msgType, buf[1:], log.Index)
case structs.NodeDeregisterRequestType:
return n.applyDeregisterNode(buf[1:], log.Index)
return n.applyDeregisterNode(msgType, buf[1:], log.Index)
case structs.NodeUpdateStatusRequestType:
return n.applyStatusUpdate(buf[1:], log.Index)
case structs.NodeUpdateDrainRequestType:
Expand Down Expand Up @@ -310,17 +311,19 @@ func (n *nomadFSM) applyClusterMetadata(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpsertNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

// Handle upgrade paths
req.Node.Canonicalize()

if err := n.state.UpsertNode(index, req.Node); err != nil {
if err := n.state.UpsertNodeCtx(ctx, index, req.Node); err != nil {
n.logger.Error("UpsertNode failed", "error", err)
return err
}
Expand All @@ -334,14 +337,16 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyDeregisterNode(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.DeleteNode(index, []string{req.NodeID}); err != nil {
ctx := context.WithValue(context.Background(), state.CtxMsgType, reqType)

if err := n.state.DeleteNodeCtx(ctx, index, []string{req.NodeID}); err != nil {
n.logger.Error("DeleteNode failed", "error", err)
return err
}
Expand Down
62 changes: 62 additions & 0 deletions nomad/state/node_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package state

import (
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
TopicNodeRegistration = "NodeRegistration"
TopicNodeDeregistration = "NodeDeregistration"
)

type NodeRegistrationEvent struct {
Event *structs.NodeEvent
NodeStatus string
}

type NodeDeregistrationEvent struct {
NodeID string
}

func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
after := change.After.(*structs.Node)

event := stream.Event{
Topic: TopicNodeRegistration,
Index: changes.Index,
Key: after.ID,
Payload: &NodeRegistrationEvent{
Event: after.Events[len(after.Events)-1],
NodeStatus: after.Status,
},
}
events = append(events, event)
}
}
return events, nil
}

func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var event stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "nodes":
before := change.Before.(*structs.Node)

event = stream.Event{
Topic: TopicNodeDeregistration,
Index: changes.Index,
Key: before.ID,
Payload: &NodeDeregistrationEvent{
NodeID: before.ID,
},
}
}
}
return []stream.Event{event}, nil
}
172 changes: 172 additions & 0 deletions nomad/state/node_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package state

import (
"testing"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

func TestNodeRegisterEventFromChanges(t *testing.T) {
cases := []struct {
Name string
MsgType structs.MessageType
Setup func(s *StateStore, tx *txn) error
Mutate func(s *StateStore, tx *txn) error
WantEvents []stream.Event
WantErr bool
WantTopic string
}{
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusReady,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeRegisterRequestType,
WantTopic: TopicNodeRegistration,
Name: "node registered initializing",
Mutate: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
},
WantEvents: []stream.Event{{
Topic: TopicNodeRegistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeRegistrationEvent{
Event: &structs.NodeEvent{
Message: "Node registered",
Subsystem: "Cluster",
},
NodeStatus: structs.NodeStatusInit,
},
}},
WantErr: false,
},
{
MsgType: structs.NodeDeregisterRequestType,
WantTopic: TopicNodeDeregistration,
Name: "node deregistered",
Setup: func(s *StateStore, tx *txn) error {
return upsertNodeTxn(tx, tx.Index, testNode())
},
Mutate: func(s *StateStore, tx *txn) error {
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
},
WantEvents: []stream.Event{{
Topic: TopicNodeDeregistration,
Key: testNodeID(),
Index: 100,
Payload: &NodeDeregistrationEvent{
NodeID: testNodeID(),
},
}},
WantErr: false,
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventPublisher()

if tc.Setup != nil {
// Bypass publish mechanism for setup
setupTx := s.db.WriteTxn(10)
require.NoError(t, tc.Setup(s, setupTx))
setupTx.Txn.Commit()
}

tx := s.db.WriteTxn(100)
require.NoError(t, tc.Mutate(s, tx))

changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
got, err := processDBChanges(tx, changes)

if tc.WantErr {
require.Error(t, err)
return
}
require.NoError(t, err)

require.Equal(t, len(tc.WantEvents), len(got))
for idx, g := range got {
switch tc.MsgType {
case structs.NodeRegisterRequestType:
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
case structs.NodeDeregisterRequestType:
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
}
}
})
}
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeRegistrationEvent)
gotPayload := got.Payload.(*NodeRegistrationEvent)

// Check payload equality for the fields that we can easily control
require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus)
require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message)
require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem)
}

func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) {
t.Helper()

require.Equal(t, want.Index, got.Index)
require.Equal(t, want.Key, got.Key)
require.Equal(t, want.Topic, got.Topic)

wantPayload := want.Payload.(*NodeDeregistrationEvent)
gotPayload := got.Payload.(*NodeDeregistrationEvent)

require.Equal(t, wantPayload, gotPayload)
}

type nodeOpts func(n *structs.Node)

func nodeNotReady(n *structs.Node) {
n.Status = structs.NodeStatusInit
}

func testNode(opts ...nodeOpts) *structs.Node {
n := mock.Node()
n.ID = testNodeID()

n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"

for _, opt := range opts {
opt(n)
}
return n
}

func testNodeID() string {
return "9d5741c1-3899-498a-98dd-eb3c05665863"
}
51 changes: 51 additions & 0 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package state

import (
"context"
"fmt"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
CtxMsgType = "type"
)

// ReadTxn is implemented by memdb.Txn to perform read operations.
Expand All @@ -21,6 +27,7 @@ type Changes struct {
// Index is the latest index at the time these changes were committed.
Index uint64
Changes memdb.Changes
MsgType structs.MessageType
}

// changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on
Expand Down Expand Up @@ -81,6 +88,17 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn {
return t
}

func (c *changeTrackerDB) WriteTxnWithCtx(ctx context.Context, idx uint64) *txn {
t := &txn{
ctx: ctx,
Txn: c.db.Txn(true),
Index: idx,
publish: c.publish,
}
t.Txn.TrackChanges()
return t
}

func (c *changeTrackerDB) publish(changes Changes) error {
readOnlyTx := c.db.Txn(false)
defer readOnlyTx.Abort()
Expand Down Expand Up @@ -113,6 +131,9 @@ func (c *changeTrackerDB) WriteTxnRestore() *txn {
// error. Any errors from the callback would be lost, which would result in a
// missing change event, even though the state store had changed.
type txn struct {
// ctx is used to hold message type information from an FSM request
ctx context.Context

*memdb.Txn
// Index in raft where the write is occurring. The value is zero for a
// read-only, or WriteTxnRestore transaction.
Expand All @@ -136,6 +157,7 @@ func (tx *txn) Commit() error {
changes := Changes{
Index: tx.Index,
Changes: tx.Txn.Changes(),
MsgType: tx.MsgType(),
}
if err := tx.publish(changes); err != nil {
return err
Expand All @@ -146,7 +168,36 @@ func (tx *txn) Commit() error {
return nil
}

// MsgType returns a MessageType from the txn's context.
// If the context is empty or the value isn't set IgnoreUnknownTypeFlag will
// be returned to signal that the MsgType is unknown.
func (tx *txn) MsgType() structs.MessageType {
if tx.ctx == nil {
return structs.IgnoreUnknownTypeFlag
}

raw := tx.ctx.Value(CtxMsgType)
if raw == nil {
return structs.IgnoreUnknownTypeFlag
}

msgType, ok := raw.(structs.MessageType)
if !ok {
return structs.IgnoreUnknownTypeFlag
}
return msgType
}

func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
switch changes.MsgType {
case structs.IgnoreUnknownTypeFlag:
// unknown event type
return []stream.Event{}, nil
case structs.NodeRegisterRequestType:
return NodeRegisterEventFromChanges(tx, changes)
case structs.NodeDeregisterRequestType:
return NodeDeregisterEventFromChanges(tx, changes)
}
// TODO: add handlers here.
return []stream.Event{}, nil
}
Loading

0 comments on commit 24d8ef2

Please sign in to comment.