Skip to content

Commit

Permalink
Consolidate node-related fields into a struct
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Nov 11, 2024
1 parent 85bbdc0 commit 2c07cdb
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 258 deletions.
10 changes: 5 additions & 5 deletions tests/integration/acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func addPolicyACP(
nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes)
for index, node := range nodes {
ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeIDs[index])
policyResult, err := node.AddPolicy(ctx, action.Policy)
policyResult, err := node.client.AddPolicy(ctx, action.Policy)

expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)
assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)
Expand Down Expand Up @@ -190,7 +190,7 @@ func addDocActorRelationshipACP(
var collectionName string
collectionName, docID = getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID)

exists, err := node.AddDocActorRelationship(
exists, err := node.client.AddDocActorRelationship(
getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID),
collectionName,
docID,
Expand Down Expand Up @@ -276,7 +276,7 @@ func deleteDocActorRelationshipACP(

collectionName, docID := getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID)

deleteDocActorRelationshipResult, err := node.DeleteDocActorRelationship(
deleteDocActorRelationshipResult, err := node.client.DeleteDocActorRelationship(
getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID),
collectionName,
docID,
Expand Down Expand Up @@ -304,7 +304,7 @@ func getCollectionAndDocInfo(s *state, collectionID, docInd, nodeID int) (string
collectionName := ""
docID := ""
if collectionID != -1 {
collection := s.collections[nodeID][collectionID]
collection := s.nodes[nodeID].collections[collectionID]
if !collection.Description().Name.HasValue() {
require.Fail(s.t, "Expected non-empty collection name, but it was empty.", s.testCase.Description)
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] {
if nodeIndex >= len(s.nodes) {
return immutable.None[string]()
}
switch client := s.nodes[nodeIndex].(type) {
switch client := s.nodes[nodeIndex].client.(type) {
case *http.Wrapper:
return immutable.Some(strings.TrimPrefix(client.Host(), "http://"))
case *cli.Wrapper:
Expand Down
44 changes: 39 additions & 5 deletions tests/integration/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/crypto"
"github.com/sourcenetwork/defradb/internal/kms"
"github.com/sourcenetwork/defradb/net"
"github.com/sourcenetwork/defradb/node"
changeDetector "github.com/sourcenetwork/defradb/tests/change_detector"
)
Expand Down Expand Up @@ -140,7 +141,7 @@ func getDefaultNodeOpts() []node.Option {
// setupNode returns the database implementation for the current
// testing state. The database type on the test state is used to
// select the datastore implementation to use.
func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) {
func setupNode(s *state, opts ...node.Option) (*nodeState, error) {
opts = append(getDefaultNodeOpts(), opts...)

switch acpType {
Expand Down Expand Up @@ -189,20 +190,53 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) {
opts = append(opts, node.WithStoreType(node.MemoryStore))

default:
return nil, "", fmt.Errorf("invalid database type: %v", s.dbt)
return nil, fmt.Errorf("invalid database type: %v", s.dbt)
}

if s.kms == PubSubKMSType {
opts = append(opts, node.WithKMS(kms.PubSubServiceType))
}

netOpts := make([]net.NodeOpt, 0)
for _, opt := range opts {
if opt, ok := opt.(net.NodeOpt); ok {
netOpts = append(netOpts, opt)
}
}

if s.isNetworkEnabled {
var addresses []string
for _, node := range s.nodes {
addresses = append(addresses, node.node.Peer.PeerInfo().String())
}
netOpts = append(netOpts, net.WithListenAddresses(addresses...))
opts = append(opts, node.WithDisableP2P(false))
}

node, err := node.New(s.ctx, opts...)
if err != nil {
return nil, "", err
return nil, err
}

err = node.Start(s.ctx)
if err != nil {
return nil, "", err
return nil, err
}
return node, path, nil

c, err := setupClient(s, node)
require.Nil(s.t, err)

eventState, err := newEventState(c.Events())
require.NoError(s.t, err)

st := &nodeState{
client: c,
node: node,
event: eventState,
p2p: newP2PState(),
dbPath: path,
netOpts: netOpts,
}

return st, nil
}
80 changes: 42 additions & 38 deletions tests/integration/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ const eventTimeout = 1 * time.Second
// waitForNetworkSetupEvents waits for p2p topic completed and
// replicator completed events to be published on the local node event bus.
func waitForNetworkSetupEvents(s *state, nodeID int) {
cols, err := s.nodes[nodeID].GetAllP2PCollections(s.ctx)
cols, err := s.nodes[nodeID].client.GetAllP2PCollections(s.ctx)
require.NoError(s.t, err)

reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx)
reps, err := s.nodes[nodeID].client.GetAllReplicators(s.ctx)
require.NoError(s.t, err)

replicatorEvents := len(reps)
p2pTopicEvent := len(cols) > 0

for p2pTopicEvent && replicatorEvents > 0 {
select {
case _, ok := <-s.nodeEvents[nodeID].replicator.Message():
case _, ok := <-s.nodes[nodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
replicatorEvents--

case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message():
case _, ok := <-s.nodes[nodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for network setup events")
}
Expand All @@ -63,7 +63,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) {
// Expected document heads will be updated for the targeted node.
func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}
Expand All @@ -73,21 +73,21 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) {
}

// all previous documents should be merged on the subscriber node
for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads {
s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid
for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads {
s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid
}

// update node connections and replicators
s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{}
s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{}
s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{}
s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{}
s.nodes[cfg.SourceNodeID].p2p.replicators[cfg.TargetNodeID] = struct{}{}
}

// waitForReplicatorConfigureEvent waits for a node to publish a
// replicator completed event on the local event bus.
func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
select {
case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message():
case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for replicator event")
}
Expand All @@ -96,9 +96,9 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
require.Fail(s.t, "timeout waiting for replicator event")
}

delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID)
delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID)
delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID)
delete(s.nodes[cfg.TargetNodeID].p2p.connections, cfg.SourceNodeID)
delete(s.nodes[cfg.SourceNodeID].p2p.connections, cfg.TargetNodeID)
delete(s.nodes[cfg.SourceNodeID].p2p.replicators, cfg.TargetNodeID)
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
Expand All @@ -107,7 +107,7 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) {
// Expected document heads will be updated for the subscriber node.
func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}
Expand All @@ -121,15 +121,15 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) {
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{}
s.nodes[action.NodeID].p2p.peerCollections[collectionIndex] = struct{}{}
}
}

// waitForSubscribeToCollectionEvent waits for a node to publish a
// p2p topic completed event on the local event bus.
func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) {
select {
case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message():
case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for p2p topic event")
}
Expand All @@ -142,7 +142,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio
if collectionIndex == NonExistentCollectionID {
continue // don't track non existent collections
}
delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex)
delete(s.nodes[action.NodeID].p2p.peerCollections, collectionIndex)
}
}

Expand All @@ -160,7 +160,8 @@ func waitForUpdateEvents(
continue // node is not selected
}

if _, ok := s.closedNodes[i]; ok {
node := s.nodes[i]
if node.closed {
continue // node is closed
}

Expand All @@ -172,7 +173,7 @@ func waitForUpdateEvents(
for len(expect) > 0 {
var evt event.Update
select {
case msg, ok := <-s.nodeEvents[i].update.Message():
case msg, ok := <-node.event.update.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i)
}
Expand All @@ -195,7 +196,7 @@ func waitForUpdateEvents(

// we only need to update the network state if the nodes
// are configured for networking
if i < len(s.nodeConfigs) {
if s.isNetworkEnabled {
updateNetworkState(s, i, evt)
}
}
Expand All @@ -208,15 +209,16 @@ func waitForUpdateEvents(
// from running forever.
func waitForMergeEvents(s *state, action WaitForSync) {
for nodeID := 0; nodeID < len(s.nodes); nodeID++ {
if _, ok := s.closedNodes[nodeID]; ok {
node := s.nodes[nodeID]
if node.closed {
continue // node is closed
}

expect := s.nodeP2P[nodeID].expectedDocHeads
expect := node.p2p.expectedDocHeads

// remove any docs that are already merged
// up to the expected document head
for key, val := range s.nodeP2P[nodeID].actualDocHeads {
for key, val := range node.p2p.actualDocHeads {
if head, ok := expect[key]; ok && head.String() == val.cid.String() {
delete(expect, key)
}
Expand All @@ -228,7 +230,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
require.Fail(s.t, "doc index %d out of range", docIndex)
}
docID := s.docIDs[0][docIndex].String()
actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID]
actual, hasActual := node.p2p.actualDocHeads[docID]
if !hasActual || !actual.decrypted {
expectDecrypted[docID] = struct{}{}
}
Expand All @@ -243,7 +245,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
for len(expect) > 0 || len(expectDecrypted) > 0 {
var evt event.MergeComplete
select {
case msg, ok := <-s.nodeEvents[nodeID].merge.Message():
case msg, ok := <-node.event.merge.Message():
if !ok {
require.Fail(s.t, "subscription closed waiting for merge complete event")
}
Expand All @@ -262,7 +264,7 @@ func waitForMergeEvents(s *state, action WaitForSync) {
if ok && head.String() == evt.Merge.Cid.String() {
delete(expect, evt.Merge.DocID)
}
s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted}
}
}
}
Expand All @@ -272,31 +274,33 @@ func waitForMergeEvents(s *state, action WaitForSync) {
func updateNetworkState(s *state, nodeID int, evt event.Update) {
// find the correct collection index for this update
collectionID := -1
for i, c := range s.collections[nodeID] {
for i, c := range s.nodes[nodeID].collections {
if c.SchemaRoot() == evt.SchemaRoot {
collectionID = i
}
}

node := s.nodes[nodeID]

// update the actual document head on the node that updated it
// as the node created the document, it is already decrypted
s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true}
node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true}

// update the expected document heads of replicator targets
for id := range s.nodeP2P[nodeID].replicators {
for id := range node.p2p.replicators {
// replicator target nodes push updates to source nodes
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}

// update the expected document heads of connected nodes
for id := range s.nodeP2P[nodeID].connections {
for id := range node.p2p.connections {
// connected nodes share updates of documents they have in common
if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}
// peer collection subscribers receive updates from any other subscriber node
if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok {
s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid
if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok {
s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid
}
}

Expand Down Expand Up @@ -325,9 +329,9 @@ func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} {
func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} {
var collection client.Collection
if action.NodeID.HasValue() {
collection = s.collections[action.NodeID.Value()][action.CollectionID]
collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID]
} else {
collection = s.collections[0][action.CollectionID]
collection = s.nodes[0].collections[action.CollectionID]
}

docs, err := parseCreateDocs(action, collection)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func executeExplainRequest(

_, nodes := getNodesWithIDs(action.NodeID, s.nodes)
for _, node := range nodes {
result := node.ExecRequest(
result := node.client.ExecRequest(
s.ctx,
action.Request,
)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func configureMigration(
) {
_, nodes := getNodesWithIDs(action.NodeID, s.nodes)
for _, node := range nodes {
txn := getTransaction(s, node, action.TransactionID, action.ExpectedError)
txn := getTransaction(s, node.client, action.TransactionID, action.ExpectedError)
ctx := db.SetContextTxn(s.ctx, txn)

err := node.SetMigration(ctx, action.LensConfig)
err := node.client.SetMigration(ctx, action.LensConfig)
expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError)

assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised)
Expand Down
Loading

0 comments on commit 2c07cdb

Please sign in to comment.