-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Node Register/Deregister event sourcing
example upsert node with context fill in writetxnwithctx ctx passing to handle event type creation, wip test node deregistration event drop Node from registration event
- Loading branch information
1 parent
3cb5f5e
commit 72a69c4
Showing
6 changed files
with
358 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package state | ||
|
||
import ( | ||
"github.com/hashicorp/nomad/nomad/stream" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
) | ||
|
||
const ( | ||
TopicNodeRegistration = "NodeRegistration" | ||
TopicNodeDeregistration = "NodeDeregistration" | ||
) | ||
|
||
type NodeRegistrationEvent struct { | ||
Event *structs.NodeEvent | ||
NodeStatus string | ||
} | ||
|
||
type NodeDeregistrationEvent struct { | ||
NodeID string | ||
} | ||
|
||
func NodeRegisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { | ||
var events []stream.Event | ||
for _, change := range changes.Changes { | ||
switch change.Table { | ||
case "nodes": | ||
after := change.After.(*structs.Node) | ||
|
||
event := stream.Event{ | ||
Topic: TopicNodeRegistration, | ||
Index: changes.Index, | ||
Key: after.ID, | ||
Payload: &NodeRegistrationEvent{ | ||
Event: after.Events[len(after.Events)-1], | ||
NodeStatus: after.Status, | ||
}, | ||
} | ||
events = append(events, event) | ||
} | ||
} | ||
return events, nil | ||
} | ||
|
||
func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { | ||
var event stream.Event | ||
for _, change := range changes.Changes { | ||
switch change.Table { | ||
case "nodes": | ||
before := change.Before.(*structs.Node) | ||
|
||
event = stream.Event{ | ||
Topic: TopicNodeDeregistration, | ||
Index: changes.Index, | ||
Key: before.ID, | ||
Payload: &NodeDeregistrationEvent{ | ||
NodeID: before.ID, | ||
}, | ||
} | ||
} | ||
} | ||
return []stream.Event{event}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
package state | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/hashicorp/nomad/nomad/mock" | ||
"github.com/hashicorp/nomad/nomad/stream" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestNodeRegisterEventFromChanges(t *testing.T) { | ||
cases := []struct { | ||
Name string | ||
MsgType structs.MessageType | ||
Setup func(s *StateStore, tx *txn) error | ||
Mutate func(s *StateStore, tx *txn) error | ||
WantEvents []stream.Event | ||
WantErr bool | ||
WantTopic string | ||
}{ | ||
{ | ||
MsgType: structs.NodeRegisterRequestType, | ||
WantTopic: TopicNodeRegistration, | ||
Name: "node registered", | ||
Mutate: func(s *StateStore, tx *txn) error { | ||
return upsertNodeTxn(tx, tx.Index, testNode()) | ||
}, | ||
WantEvents: []stream.Event{{ | ||
Topic: TopicNodeRegistration, | ||
Key: testNodeID(), | ||
Index: 100, | ||
Payload: &NodeRegistrationEvent{ | ||
Event: &structs.NodeEvent{ | ||
Message: "Node registered", | ||
Subsystem: "Cluster", | ||
}, | ||
NodeStatus: structs.NodeStatusReady, | ||
}, | ||
}}, | ||
WantErr: false, | ||
}, | ||
{ | ||
MsgType: structs.NodeRegisterRequestType, | ||
WantTopic: TopicNodeRegistration, | ||
Name: "node registered initializing", | ||
Mutate: func(s *StateStore, tx *txn) error { | ||
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) | ||
}, | ||
WantEvents: []stream.Event{{ | ||
Topic: TopicNodeRegistration, | ||
Key: testNodeID(), | ||
Index: 100, | ||
Payload: &NodeRegistrationEvent{ | ||
Event: &structs.NodeEvent{ | ||
Message: "Node registered", | ||
Subsystem: "Cluster", | ||
}, | ||
NodeStatus: structs.NodeStatusInit, | ||
}, | ||
}}, | ||
WantErr: false, | ||
}, | ||
{ | ||
MsgType: structs.NodeDeregisterRequestType, | ||
WantTopic: TopicNodeDeregistration, | ||
Name: "node deregistered", | ||
Setup: func(s *StateStore, tx *txn) error { | ||
return upsertNodeTxn(tx, tx.Index, testNode()) | ||
}, | ||
Mutate: func(s *StateStore, tx *txn) error { | ||
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) | ||
}, | ||
WantEvents: []stream.Event{{ | ||
Topic: TopicNodeDeregistration, | ||
Key: testNodeID(), | ||
Index: 100, | ||
Payload: &NodeDeregistrationEvent{ | ||
NodeID: testNodeID(), | ||
}, | ||
}}, | ||
WantErr: false, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
t.Run(tc.Name, func(t *testing.T) { | ||
s := TestStateStoreCfg(t, TestStateStorePublisher(t)) | ||
defer s.StopEventPublisher() | ||
|
||
if tc.Setup != nil { | ||
// Bypass publish mechanism for setup | ||
setupTx := s.db.WriteTxn(10) | ||
require.NoError(t, tc.Setup(s, setupTx)) | ||
setupTx.Txn.Commit() | ||
} | ||
|
||
tx := s.db.WriteTxn(100) | ||
require.NoError(t, tc.Mutate(s, tx)) | ||
|
||
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType} | ||
got, err := processDBChanges(tx, changes) | ||
|
||
if tc.WantErr { | ||
require.Error(t, err) | ||
return | ||
} | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, len(tc.WantEvents), len(got)) | ||
for idx, g := range got { | ||
switch tc.MsgType { | ||
case structs.NodeRegisterRequestType: | ||
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g) | ||
case structs.NodeDeregisterRequestType: | ||
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func requireNodeRegistrationEventEqual(t *testing.T, want, got stream.Event) { | ||
t.Helper() | ||
|
||
require.Equal(t, want.Index, got.Index) | ||
require.Equal(t, want.Key, got.Key) | ||
require.Equal(t, want.Topic, got.Topic) | ||
|
||
wantPayload := want.Payload.(*NodeRegistrationEvent) | ||
gotPayload := got.Payload.(*NodeRegistrationEvent) | ||
|
||
// Check payload equality for the fields that we can easily control | ||
require.Equal(t, wantPayload.NodeStatus, gotPayload.NodeStatus) | ||
require.Equal(t, wantPayload.Event.Message, gotPayload.Event.Message) | ||
require.Equal(t, wantPayload.Event.Subsystem, gotPayload.Event.Subsystem) | ||
} | ||
|
||
func requireNodeDeregistrationEventEqual(t *testing.T, want, got stream.Event) { | ||
t.Helper() | ||
|
||
require.Equal(t, want.Index, got.Index) | ||
require.Equal(t, want.Key, got.Key) | ||
require.Equal(t, want.Topic, got.Topic) | ||
|
||
wantPayload := want.Payload.(*NodeDeregistrationEvent) | ||
gotPayload := got.Payload.(*NodeDeregistrationEvent) | ||
|
||
require.Equal(t, wantPayload, gotPayload) | ||
} | ||
|
||
type nodeOpts func(n *structs.Node) | ||
|
||
func nodeNotReady(n *structs.Node) { | ||
n.Status = structs.NodeStatusInit | ||
} | ||
|
||
func testNode(opts ...nodeOpts) *structs.Node { | ||
n := mock.Node() | ||
n.ID = testNodeID() | ||
|
||
n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee" | ||
|
||
for _, opt := range opts { | ||
opt(n) | ||
} | ||
return n | ||
} | ||
|
||
func testNodeID() string { | ||
return "9d5741c1-3899-498a-98dd-eb3c05665863" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.