Skip to content

Commit

Permalink
refactor: Consolidate node-related fields into a struct (sourcenetwor…
Browse files Browse the repository at this point in the history
…k#3232)

## Relevant issue(s)

Resolves sourcenetwork#3208

## Description

All node-related fields are moved into a separate `nodeState` struct so
now we don't need to maintain all slices and node indexes and it's also
easier to reason about a node's state.
  • Loading branch information
islamaliev authored Nov 14, 2024
1 parent 909c4af commit 198454b
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 205 deletions.
4 changes: 2 additions & 2 deletions tests/integration/acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func getCollectionAndDocInfo(s *state, collectionID, docInd, nodeID int) (string
collectionName := ""
docID := ""
if collectionID != -1 {
collection := s.collections[nodeID][collectionID]
collection := s.nodes[nodeID].collections[collectionID]
if !collection.Description().Name.HasValue() {
require.Fail(s.t, "Expected non-empty collection name, but it was empty.", s.testCase.Description)
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] {
if nodeIndex >= len(s.nodes) {
return immutable.None[string]()
}
switch client := s.nodes[nodeIndex].(type) {
switch client := s.nodes[nodeIndex].Client.(type) {
case *http.Wrapper:
return immutable.Some(strings.TrimPrefix(client.Host(), "http://"))
case *cli.Wrapper:
Expand Down
42 changes: 37 additions & 5 deletions tests/integration/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/crypto"
"github.com/sourcenetwork/defradb/internal/kms"
"github.com/sourcenetwork/defradb/net"
"github.com/sourcenetwork/defradb/node"
changeDetector "github.com/sourcenetwork/defradb/tests/change_detector"
)
Expand Down Expand Up @@ -140,7 +141,7 @@ func getDefaultNodeOpts() []node.Option {
// setupNode returns the database implementation for the current
// testing state. The database type on the test state is used to
// select the datastore implementation to use.
func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) {
func setupNode(s *state, opts ...node.Option) (*nodeState, error) {
opts = append(getDefaultNodeOpts(), opts...)

switch acpType {
Expand Down Expand Up @@ -189,20 +190,51 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) {
opts = append(opts, node.WithStoreType(node.MemoryStore))

default:
return nil, "", fmt.Errorf("invalid database type: %v", s.dbt)
return nil, fmt.Errorf("invalid database type: %v", s.dbt)
}

if s.kms == PubSubKMSType {
opts = append(opts, node.WithKMS(kms.PubSubServiceType))
}

netOpts := make([]net.NodeOpt, 0)
for _, opt := range opts {
if opt, ok := opt.(net.NodeOpt); ok {
netOpts = append(netOpts, opt)
}
}

if s.isNetworkEnabled {
opts = append(opts, node.WithDisableP2P(false))
}

node, err := node.New(s.ctx, opts...)
if err != nil {
return nil, "", err
return nil, err
}

err = node.Start(s.ctx)
if err != nil {
return nil, "", err
return nil, err
}
return node, path, nil

c, err := setupClient(s, node)
require.Nil(s.t, err)

eventState, err := newEventState(c.Events())
require.NoError(s.t, err)

st := &nodeState{
Client: c,
event: eventState,
p2p: newP2PState(),
dbPath: path,
netOpts: netOpts,
}

if node.Peer != nil {
st.peerInfo = node.Peer.PeerInfo()
}

return st, nil
}
76 changes: 40 additions & 36 deletions tests/integration/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func waitForNetworkSetupEvents(s *state, nodeID int) {

for p2pTopicEvent && replicatorEvents > 0 {
select {
case _, ok := <-s.nodeEvents[nodeID].replicator.Message():
case _, ok := <-s.nodes[nodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
replicatorEvents--

case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message():
case _, ok := <-s.nodes[nodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
Expand All @@ -63,7 +63,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) {
// Expected document heads will be updated for the targeted node.
func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}
Expand All @@ -73,21 +73,21 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
}

// all previous documents should be merged on the subscriber node
for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads {
s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid
for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads {
s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid
}

// update node connections and replicators
s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{}
s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{}
s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{}
s.nodes[cfg.SourceNodeID].p2p.replicators[cfg.TargetNodeID] = struct{}{}
}

// waitForReplicatorConfigureEvent waits for a node to publish a
// replicator completed event on the local event bus.
func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}
Expand All @@ -96,9 +96,9 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
require.Fail(s.t, "timeout waiting for replicator event")
}

delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID)
delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID)
delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID)
delete(s.nodes[cfg.TargetNodeID].p2p.connections, cfg.SourceNodeID)
delete(s.nodes[cfg.SourceNodeID].p2p.connections, cfg.TargetNodeID)
delete(s.nodes[cfg.SourceNodeID].p2p.replicators, cfg.TargetNodeID)
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
Expand All @@ -107,7 +107,7 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
// Expected document heads will be updated for the subscriber node.
func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}
Expand All @@ -121,15 +121,15 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) {
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{}
s.nodes[action.NodeID].p2p.peerCollections[collectionIndex] = struct{}{}
}
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
// p2p topic completed event on the local event bus.
func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}
Expand All @@ -142,7 +142,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex)
delete(s.nodes[action.NodeID].p2p.peerCollections, collectionIndex)
}
}

Expand All @@ -160,7 +160,8 @@ func waitForUpdateEvents(
continue // node is not selected
}

if _, ok := s.closedNodes[i]; ok {
node := s.nodes[i]
if node.closed {
continue // node is closed
}

Expand All @@ -172,7 +173,7 @@ func waitForUpdateEvents(
for len(expect) > 0 {
var evt event.Update
select {
case msg, ok := <-s.nodeEvents[i].update.Message():
case msg, ok := <-node.event.update.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i)
}
Expand All @@ -195,7 +196,7 @@ func waitForUpdateEvents(

// we only need to update the network state if the nodes
// are configured for networking
if i < len(s.nodeConfigs) {
if s.isNetworkEnabled {
updateNetworkState(s, i, evt)
}
}
Expand All @@ -208,15 +209,16 @@ func waitForUpdateEvents(
// from running forever.
func waitForMergeEvents(s *state, action WaitForSync) {
for nodeID := 0; nodeID < len(s.nodes); nodeID++ {
if _, ok := s.closedNodes[nodeID]; ok {
node := s.nodes[nodeID]
if node.closed {
continue // node is closed
}

expect := s.nodeP2P[nodeID].expectedDocHeads
expect := node.p2p.expectedDocHeads

// remove any docs that are already merged
// up to the expected document head
for key, val := range s.nodeP2P[nodeID].actualDocHeads {
for key, val := range node.p2p.actualDocHeads {
if head, ok := expect[key]; ok && head.String() == val.cid.String() {
delete(expect, key)
}
Expand All @@ -228,7 +230,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
require.Fail(s.t, "doc index %d out of range", docIndex)
}
docID := s.docIDs[0][docIndex].String()
actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID]
actual, hasActual := node.p2p.actualDocHeads[docID]
if !hasActual || !actual.decrypted {
expectDecrypted[docID] = struct{}{}
}
Expand All @@ -243,7 +245,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
for len(expect) > 0 || len(expectDecrypted) > 0 {
var evt event.MergeComplete
select {
case msg, ok := <-s.nodeEvents[nodeID].merge.Message():
case msg, ok := <-node.event.merge.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for merge complete event")
}
Expand All @@ -262,7 +264,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
if ok && head.String() == evt.Merge.Cid.String() {
delete(expect, evt.Merge.DocID)
}
s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
}
}
}
Expand All @@ -272,31 +274,33 @@ func waitForMergeEvents(s *state, action WaitForSync) {
func updateNetworkState(s *state, nodeID int, evt event.Update) {
// find the correct collection index for this update
collectionID := -1
for i, c := range s.collections[nodeID] {
for i, c := range s.nodes[nodeID].collections {
if c.SchemaRoot() == evt.SchemaRoot {
collectionID = i
}
}

node := s.nodes[nodeID]

// update the actual document head on the node that updated it
// as the node created the document, it is already decrypted
s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true}
node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true}

// update the expected document heads of replicator targets
for id := range s.nodeP2P[nodeID].replicators {
for id := range node.p2p.replicators {
// replicator target nodes push updates to source nodes
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}

// update the expected document heads of connected nodes
for id := range s.nodeP2P[nodeID].connections {
for id := range node.p2p.connections {
// connected nodes share updates of documents they have in common
if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}
// peer collection subscribers receive updates from any other subscriber node
if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}
}

Expand Down Expand Up @@ -325,9 +329,9 @@ func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} {
func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} {
var collection client.Collection
if action.NodeID.HasValue() {
collection = s.collections[action.NodeID.Value()][action.CollectionID]
collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID]
} else {
collection = s.collections[0][action.CollectionID]
collection = s.nodes[0].collections[action.CollectionID]
}

docs, err := parseCreateDocs(action, collection)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func configureMigration(
) {
_, nodes := getNodesWithIDs(action.NodeID, s.nodes)
for _, node := range nodes {
txn := getTransaction(s, node, action.TransactionID, action.ExpectedError)
txn := getTransaction(s, node.Client, action.TransactionID, action.ExpectedError)
ctx := db.SetContextTxn(s.ctx, txn)

err := node.SetMigration(ctx, action.LensConfig)
Expand Down
14 changes: 7 additions & 7 deletions tests/integration/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func connectPeers(
err := sourceNode.Connect(s.ctx, targetNode.PeerInfo())
require.NoError(s.t, err)

s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{}
s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{}
s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{}
s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{}

// Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be
// allowed to complete before documentation begins or it will not even try and sync it. So for now, we
Expand Down Expand Up @@ -219,7 +219,7 @@ func subscribeToCollection(
continue
}

col := s.collections[action.NodeID][collectionIndex]
col := s.nodes[action.NodeID].collections[collectionIndex]
schemaRoots = append(schemaRoots, col.SchemaRoot())
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func unsubscribeToCollection(
continue
}

col := s.collections[action.NodeID][collectionIndex]
col := s.nodes[action.NodeID].collections[collectionIndex]
schemaRoots = append(schemaRoots, col.SchemaRoot())
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func getAllP2PCollections(
) {
expectedCollections := []string{}
for _, collectionIndex := range action.ExpectedCollectionIDs {
col := s.collections[action.NodeID][collectionIndex]
col := s.nodes[action.NodeID].collections[collectionIndex]
expectedCollections = append(expectedCollections, col.SchemaRoot())
}

Expand All @@ -294,8 +294,8 @@ func getAllP2PCollections(

// reconnectPeers makes sure that all peers are connected after a node restart action.
func reconnectPeers(s *state) {
for i, n := range s.nodeP2P {
for j := range n.connections {
for i, n := range s.nodes {
for j := range n.p2p.connections {
sourceNode := s.nodes[i]
targetNode := s.nodes[j]

Expand Down
Loading

0 comments on commit 198454b

Please sign in to comment.