diff --git a/tests/integration/acp.go b/tests/integration/acp.go index b98be7a059..297719d06d 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -113,7 +113,7 @@ func addPolicyACP( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeIDs[index]) - policyResult, err := node.AddPolicy(ctx, action.Policy) + policyResult, err := node.client.AddPolicy(ctx, action.Policy) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -190,7 +190,7 @@ func addDocActorRelationshipACP( var collectionName string collectionName, docID = getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) - exists, err := node.AddDocActorRelationship( + exists, err := node.client.AddDocActorRelationship( getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID), collectionName, docID, @@ -276,7 +276,7 @@ func deleteDocActorRelationshipACP( collectionName, docID := getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) - deleteDocActorRelationshipResult, err := node.DeleteDocActorRelationship( + deleteDocActorRelationshipResult, err := node.client.DeleteDocActorRelationship( getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID), collectionName, docID, @@ -304,7 +304,7 @@ func getCollectionAndDocInfo(s *state, collectionID, docInd, nodeID int) (string collectionName := "" docID := "" if collectionID != -1 { - collection := s.collections[nodeID][collectionID] + collection := s.nodes[nodeID].collections[collectionID] if !collection.Description().Name.HasValue() { require.Fail(s.t, "Expected non-empty collection name, but it was empty.", s.testCase.Description) } @@ -617,7 +617,7 @@ func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] { if nodeIndex >= len(s.nodes) { return immutable.None[string]() } - switch client := s.nodes[nodeIndex].(type) { + switch client := s.nodes[nodeIndex].client.(type) { case *http.Wrapper: return immutable.Some(strings.TrimPrefix(client.Host(), "http://")) case *cli.Wrapper: diff --git a/tests/integration/db.go b/tests/integration/db.go index 784ff6952f..17129447c9 100644 --- a/tests/integration/db.go +++ b/tests/integration/db.go @@ -22,6 +22,7 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/internal/kms" + "github.com/sourcenetwork/defradb/net" "github.com/sourcenetwork/defradb/node" changeDetector "github.com/sourcenetwork/defradb/tests/change_detector" ) @@ -140,7 +141,7 @@ func getDefaultNodeOpts() []node.Option { // setupNode returns the database implementation for the current // testing state. The database type on the test state is used to // select the datastore implementation to use. -func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { +func setupNode(s *state, opts ...node.Option) (*nodeState, error) { opts = append(getDefaultNodeOpts(), opts...) switch acpType { @@ -189,20 +190,53 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { opts = append(opts, node.WithStoreType(node.MemoryStore)) default: - return nil, "", fmt.Errorf("invalid database type: %v", s.dbt) + return nil, fmt.Errorf("invalid database type: %v", s.dbt) } if s.kms == PubSubKMSType { opts = append(opts, node.WithKMS(kms.PubSubServiceType)) } + netOpts := make([]net.NodeOpt, 0) + for _, opt := range opts { + if opt, ok := opt.(net.NodeOpt); ok { + netOpts = append(netOpts, opt) + } + } + + if s.isNetworkEnabled { + var addresses []string + for _, node := range s.nodes { + addresses = append(addresses, node.node.Peer.PeerInfo().String()) + } + netOpts = append(netOpts, net.WithListenAddresses(addresses...)) + opts = append(opts, node.WithDisableP2P(false)) + } + node, err := node.New(s.ctx, opts...) if err != nil { - return nil, "", err + return nil, err } + err = node.Start(s.ctx) if err != nil { - return nil, "", err + return nil, err } - return node, path, nil + + c, err := setupClient(s, node) + require.Nil(s.t, err) + + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + + st := &nodeState{ + client: c, + node: node, + event: eventState, + p2p: newP2PState(), + dbPath: path, + netOpts: netOpts, + } + + return st, nil } diff --git a/tests/integration/events.go b/tests/integration/events.go index bbe19ce391..67ab6c36fa 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -28,10 +28,10 @@ const eventTimeout = 1 * time.Second // waitForNetworkSetupEvents waits for p2p topic completed and // replicator completed events to be published on the local node event bus. func waitForNetworkSetupEvents(s *state, nodeID int) { - cols, err := s.nodes[nodeID].GetAllP2PCollections(s.ctx) + cols, err := s.nodes[nodeID].client.GetAllP2PCollections(s.ctx) require.NoError(s.t, err) - reps, err := s.nodes[nodeID].GetAllReplicators(s.ctx) + reps, err := s.nodes[nodeID].client.GetAllReplicators(s.ctx) require.NoError(s.t, err) replicatorEvents := len(reps) @@ -39,13 +39,13 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { for p2pTopicEvent && replicatorEvents > 0 { select { - case _, ok := <-s.nodeEvents[nodeID].replicator.Message(): + case _, ok := <-s.nodes[nodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for network setup events") } replicatorEvents-- - case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[nodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for network setup events") } @@ -63,7 +63,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { // Expected document heads will be updated for the targeted node. func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { select { - case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for replicator event") } @@ -73,21 +73,21 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { - s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid + for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads { + s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid } // update node connections and replicators - s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} - s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} - s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.replicators[cfg.TargetNodeID] = struct{}{} } // waitForReplicatorConfigureEvent waits for a node to publish a // replicator completed event on the local event bus. func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { select { - case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for replicator event") } @@ -96,9 +96,9 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { require.Fail(s.t, "timeout waiting for replicator event") } - delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID) - delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID) - delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID) + delete(s.nodes[cfg.TargetNodeID].p2p.connections, cfg.SourceNodeID) + delete(s.nodes[cfg.SourceNodeID].p2p.connections, cfg.TargetNodeID) + delete(s.nodes[cfg.SourceNodeID].p2p.replicators, cfg.TargetNodeID) } // waitForSubscribeToCollectionEvent waits for a node to publish a @@ -107,7 +107,7 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { // Expected document heads will be updated for the subscriber node. func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { select { - case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for p2p topic event") } @@ -121,7 +121,7 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{} + s.nodes[action.NodeID].p2p.peerCollections[collectionIndex] = struct{}{} } } @@ -129,7 +129,7 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { // p2p topic completed event on the local event bus. func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) { select { - case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for p2p topic event") } @@ -142,7 +142,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex) + delete(s.nodes[action.NodeID].p2p.peerCollections, collectionIndex) } } @@ -160,7 +160,8 @@ func waitForUpdateEvents( continue // node is not selected } - if _, ok := s.closedNodes[i]; ok { + node := s.nodes[i] + if node.closed { continue // node is closed } @@ -172,7 +173,7 @@ func waitForUpdateEvents( for len(expect) > 0 { var evt event.Update select { - case msg, ok := <-s.nodeEvents[i].update.Message(): + case msg, ok := <-node.event.update.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i) } @@ -195,7 +196,7 @@ func waitForUpdateEvents( // we only need to update the network state if the nodes // are configured for networking - if i < len(s.nodeConfigs) { + if s.isNetworkEnabled { updateNetworkState(s, i, evt) } } @@ -208,15 +209,16 @@ func waitForUpdateEvents( // from running forever. func waitForMergeEvents(s *state, action WaitForSync) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { - if _, ok := s.closedNodes[nodeID]; ok { + node := s.nodes[nodeID] + if node.closed { continue // node is closed } - expect := s.nodeP2P[nodeID].expectedDocHeads + expect := node.p2p.expectedDocHeads // remove any docs that are already merged // up to the expected document head - for key, val := range s.nodeP2P[nodeID].actualDocHeads { + for key, val := range node.p2p.actualDocHeads { if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } @@ -228,7 +230,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { require.Fail(s.t, "doc index %d out of range", docIndex) } docID := s.docIDs[0][docIndex].String() - actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID] + actual, hasActual := node.p2p.actualDocHeads[docID] if !hasActual || !actual.decrypted { expectDecrypted[docID] = struct{}{} } @@ -243,7 +245,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { for len(expect) > 0 || len(expectDecrypted) > 0 { var evt event.MergeComplete select { - case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): + case msg, ok := <-node.event.merge.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for merge complete event") } @@ -262,7 +264,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { if ok && head.String() == evt.Merge.Cid.String() { delete(expect, evt.Merge.DocID) } - s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} + node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -272,31 +274,33 @@ func waitForMergeEvents(s *state, action WaitForSync) { func updateNetworkState(s *state, nodeID int, evt event.Update) { // find the correct collection index for this update collectionID := -1 - for i, c := range s.collections[nodeID] { + for i, c := range s.nodes[nodeID].collections { if c.SchemaRoot() == evt.SchemaRoot { collectionID = i } } + node := s.nodes[nodeID] + // update the actual document head on the node that updated it // as the node created the document, it is already decrypted - s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} + node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets - for id := range s.nodeP2P[nodeID].replicators { + for id := range node.p2p.replicators { // replicator target nodes push updates to source nodes - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } // update the expected document heads of connected nodes - for id := range s.nodeP2P[nodeID].connections { + for id := range node.p2p.connections { // connected nodes share updates of documents they have in common - if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok { + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } // peer collection subscribers receive updates from any other subscriber node - if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } } @@ -325,9 +329,9 @@ func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { - collection = s.collections[action.NodeID.Value()][action.CollectionID] + collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] } else { - collection = s.collections[0][action.CollectionID] + collection = s.nodes[0].collections[action.CollectionID] } docs, err := parseCreateDocs(action, collection) diff --git a/tests/integration/explain.go b/tests/integration/explain.go index c7090a7b50..d164545381 100644 --- a/tests/integration/explain.go +++ b/tests/integration/explain.go @@ -135,7 +135,7 @@ func executeExplainRequest( _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - result := node.ExecRequest( + result := node.client.ExecRequest( s.ctx, action.Request, ) diff --git a/tests/integration/lens.go b/tests/integration/lens.go index c361c55342..0c6b1efeb7 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -59,10 +59,10 @@ func configureMigration( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - txn := getTransaction(s, node, action.TransactionID, action.ExpectedError) + txn := getTransaction(s, node.client, action.TransactionID, action.ExpectedError) ctx := db.SetContextTxn(s.ctx, txn) - err := node.SetMigration(ctx, action.LensConfig) + err := node.client.SetMigration(ctx, action.LensConfig) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 87e224dce4..2dd20278d0 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -150,14 +150,14 @@ func connectPeers( targetNode := s.nodes[cfg.TargetNodeID] log.InfoContext(s.ctx, "Connect peers", - corelog.Any("Source", sourceNode.PeerInfo()), - corelog.Any("Target", targetNode.PeerInfo())) + corelog.Any("Source", sourceNode.client.PeerInfo()), + corelog.Any("Target", targetNode.client.PeerInfo())) - err := sourceNode.Connect(s.ctx, targetNode.PeerInfo()) + err := sourceNode.client.Connect(s.ctx, targetNode.client.PeerInfo()) require.NoError(s.t, err) - s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} - s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{} // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we @@ -177,8 +177,8 @@ func configureReplicator( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - err := sourceNode.SetReplicator(s.ctx, client.ReplicatorParams{ - Info: targetNode.PeerInfo(), + err := sourceNode.client.SetReplicator(s.ctx, client.ReplicatorParams{ + Info: targetNode.client.PeerInfo(), }) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, cfg.ExpectedError) @@ -196,8 +196,8 @@ func deleteReplicator( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - err := sourceNode.DeleteReplicator(s.ctx, client.ReplicatorParams{ - Info: targetNode.PeerInfo(), + err := sourceNode.client.DeleteReplicator(s.ctx, client.ReplicatorParams{ + Info: targetNode.client.PeerInfo(), }) require.NoError(s.t, err) waitForReplicatorDeleteEvent(s, cfg) @@ -219,11 +219,11 @@ func subscribeToCollection( continue } - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } - err := n.AddP2PCollections(s.ctx, schemaRoots) + err := n.client.AddP2PCollections(s.ctx, schemaRoots) if err == nil { waitForSubscribeToCollectionEvent(s, action) } @@ -253,11 +253,11 @@ func unsubscribeToCollection( continue } - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } - err := n.RemoveP2PCollections(s.ctx, schemaRoots) + err := n.client.RemoveP2PCollections(s.ctx, schemaRoots) if err == nil { waitForUnsubscribeToCollectionEvent(s, action) } @@ -281,12 +281,12 @@ func getAllP2PCollections( ) { expectedCollections := []string{} for _, collectionIndex := range action.ExpectedCollectionIDs { - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] expectedCollections = append(expectedCollections, col.SchemaRoot()) } n := s.nodes[action.NodeID] - cols, err := n.GetAllP2PCollections(s.ctx) + cols, err := n.client.GetAllP2PCollections(s.ctx) require.NoError(s.t, err) assert.Equal(s.t, expectedCollections, cols) @@ -294,16 +294,16 @@ func getAllP2PCollections( // reconnectPeers makes sure that all peers are connected after a node restart action. func reconnectPeers(s *state) { - for i, n := range s.nodeP2P { - for j := range n.connections { + for i, n := range s.nodes { + for j := range n.p2p.connections { sourceNode := s.nodes[i] targetNode := s.nodes[j] log.InfoContext(s.ctx, "Connect peers", - corelog.Any("Source", sourceNode.PeerInfo()), - corelog.Any("Target", targetNode.PeerInfo())) + corelog.Any("Source", sourceNode.client.PeerInfo()), + corelog.Any("Target", targetNode.client.PeerInfo())) - err := sourceNode.Connect(s.ctx, targetNode.PeerInfo()) + err := sourceNode.client.Connect(s.ctx, targetNode.client.PeerInfo()) require.NoError(s.t, err) } } diff --git a/tests/integration/state.go b/tests/integration/state.go index e7130f2ebd..3dc5d2a8f1 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -15,7 +15,6 @@ import ( "testing" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" @@ -114,6 +113,30 @@ func newEventState(bus *event.Bus) (*eventState, error) { }, nil } +// nodeState contains all testing state for a node. +type nodeState struct { + // The node active in this test. + node *node.Node + // The node's client active in this test. + client clients.Client + // event contains all event node subscriptions. + event *eventState + // p2p contains p2p states for the node. + p2p *p2pState + // The network configurations for the nodes + netOpts []net.NodeOpt + // The path to any file-based databases active in this test. + dbPath string + // Collections by index present in the test. + // Indexes matches that of collectionNames. + collections []client.Collection + // Indexes, by index, by collection index. + indexes [][]client.IndexDescription + // indicates if the node is closed. + closed bool +} + +// state contains all testing state. type state struct { // The test context. ctx context.Context @@ -124,6 +147,7 @@ type state struct { // The TestCase currently being executed. testCase TestCase + // The type of KMS currently being tested. kms KMSType // The type of database currently being tested. @@ -153,30 +177,11 @@ type state struct { // These channels will receive a function which asserts results of any subscription requests. subscriptionResultsChans []chan func() - // nodeEvents contains all event node subscriptions. - nodeEvents []*eventState - - // The addresses of any nodes configured. - nodeAddresses []peer.AddrInfo - - // The configurations for any nodes - nodeConfigs [][]net.NodeOpt - // The nodes active in this test. - nodes []clients.Client - - // closedNodes contains the indexes of nodes that have been closed. - closedNodes map[int]struct{} - - // nodeP2P contains p2p states for all nodes - nodeP2P []*p2pState + nodes []*nodeState - // The paths to any file-based databases active in this test. - dbPaths []string - - // Collections by index, by nodeID present in the test. - // Indexes matches that of collectionNames. - collections [][]client.Collection + // The ACP options to share between each node. + acpOptions []node.ACPOpt // The names of the collections active in this test. // Indexes matches that of initial collections. @@ -196,17 +201,14 @@ type state struct { // Valid Cid string values by [UniqueCid] ID. cids map[any]string - // Indexes, by index, by collection index, by node index. - indexes [][][]client.IndexDescription - // isBench indicates wether the test is currently being benchmarked. isBench bool // The SourceHub address used to pay for SourceHub transactions. sourcehubAddress string - // The ACP options to share between each node. - acpOptions []node.ACPOpt + // isNetworkEnabled indicates whether the network is enabled. + isNetworkEnabled bool } // newState returns a new fresh state for the given testCase. @@ -230,19 +232,10 @@ func newState( allActionsDone: make(chan struct{}), identities: map[identityRef]*identityHolder{}, subscriptionResultsChans: []chan func(){}, - nodeEvents: []*eventState{}, - nodeAddresses: []peer.AddrInfo{}, - nodeConfigs: [][]net.NodeOpt{}, - nodeP2P: []*p2pState{}, - nodes: []clients.Client{}, - closedNodes: map[int]struct{}{}, - dbPaths: []string{}, - collections: [][]client.Collection{}, collectionNames: collectionNames, collectionIndexesByRoot: map[uint32]int{}, docIDs: [][]client.DocID{}, cids: map[any]string{}, - indexes: [][][]client.IndexDescription{}, isBench: false, } } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index f827ac0130..51adaa42ea 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -442,7 +442,7 @@ func createGenerateDocs(s *state, docs []gen.GeneratedDoc, nodeID immutable.Opti func generateDocs(s *state, action GenerateDocs) { nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) firstNodesID := nodeIDs[0] - collections := s.collections[firstNodesID] + collections := s.nodes[firstNodesID].collections defs := make([]client.CollectionDefinition, 0, len(collections)) for _, collection := range collections { if len(action.ForCollections) == 0 || slices.Contains(action.ForCollections, collection.Name().Value()) { @@ -459,7 +459,7 @@ func generateDocs(s *state, action GenerateDocs) { func generatePredefinedDocs(s *state, action CreatePredefinedDocs) { nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) firstNodesID := nodeIDs[0] - collections := s.collections[firstNodesID] + collections := s.nodes[firstNodesID].collections defs := make([]client.CollectionDefinition, 0, len(collections)) for _, col := range collections { defs = append(defs, col.Definition()) @@ -577,10 +577,10 @@ func closeNodes( s *state, action Close, ) { - nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) - for i, node := range nodes { - node.Close() - s.closedNodes[nodeIDs[i]] = struct{}{} + _, nodes := getNodesWithIDs(action.NodeID, s.nodes) + for _, node := range nodes { + node.client.Close() + node.closed = true } } @@ -594,7 +594,7 @@ func closeNodes( // greater than 0. For example if requesting a node with nodeID=2 then the resulting output will contain only // one element (at index 0) caller might accidentally assume that this node belongs to node 0. Therefore, the // caller should always use the returned IDs, instead of guessing the IDs based on node indexes. -func getNodesWithIDs(nodeID immutable.Option[int], nodes []clients.Client) ([]int, []clients.Client) { +func getNodesWithIDs(nodeID immutable.Option[int], nodes []*nodeState) ([]int, []*nodeState) { if !nodeID.HasValue() { indexes := make([]int, len(nodes)) for i := range nodes { @@ -603,7 +603,7 @@ func getNodesWithIDs(nodeID immutable.Option[int], nodes []clients.Client) ([]in return indexes, nodes } - return []int{nodeID.Value()}, []clients.Client{nodes[nodeID.Value()]} + return []int{nodeID.Value()}, []*nodeState{nodes[nodeID.Value()]} } func calculateLenForFlattenedActions(testCase *TestCase) int { @@ -711,83 +711,35 @@ ActionLoop: func setStartingNodes( s *state, ) { - hasExplicitNode := false for _, action := range s.testCase.Actions { switch action.(type) { case ConfigureNode: - hasExplicitNode = true + s.isNetworkEnabled = true } } // If nodes have not been explicitly configured via actions, setup a default one. - if !hasExplicitNode { - node, path, err := setupNode(s) - require.Nil(s.t, err) - - c, err := setupClient(s, node) + if !s.isNetworkEnabled { + st, err := setupNode(s) require.Nil(s.t, err) - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - - s.nodes = append(s.nodes, c) - s.nodeEvents = append(s.nodeEvents, eventState) - s.nodeP2P = append(s.nodeP2P, newP2PState()) - s.dbPaths = append(s.dbPaths, path) + s.nodes = append(s.nodes, st) } } func startNodes(s *state, action Start) { - _, nodes := getNodesWithIDs(action.NodeID, s.nodes) + nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) // We need to restart the nodes in reverse order, to avoid dial backoff issues. for i := len(nodes) - 1; i >= 0; i-- { - nodeIndex := i - if action.NodeID.HasValue() { - nodeIndex = action.NodeID.Value() - } + nodeIndex := nodeIDs[i] originalPath := databaseDir - databaseDir = s.dbPaths[nodeIndex] - node, _, err := setupNode(s, db.WithNodeIdentity(getIdentity(s, NodeIdentity(nodeIndex)))) + databaseDir = s.nodes[nodeIndex].dbPath + node, err := setupNode(s, db.WithNodeIdentity(getIdentity(s, NodeIdentity(nodeIndex)))) require.NoError(s.t, err) databaseDir = originalPath - if len(s.nodeConfigs) == 0 { - // If there are no explicit node configuration actions the node will be - // basic (i.e. no P2P stuff) and can be yielded now. - c, err := setupClient(s, node) - require.NoError(s.t, err) - s.nodes[nodeIndex] = c - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - s.nodeEvents[nodeIndex] = eventState - continue - } - - // We need to make sure the node is configured with its old address, otherwise - // a new one may be selected and reconnection to it will fail. - var addresses []string - for _, addr := range s.nodeAddresses[nodeIndex].Addrs { - addresses = append(addresses, addr.String()) - } - - nodeOpts := s.nodeConfigs[nodeIndex] - nodeOpts = append(nodeOpts, net.WithListenAddresses(addresses...)) - - node.Peer, err = net.NewPeer(s.ctx, node.DB.Blockstore(), node.DB.Encstore(), node.DB.Events(), nodeOpts...) - require.NoError(s.t, err) + s.nodes[nodeIndex] = node - c, err := setupClient(s, node) - require.NoError(s.t, err) - s.nodes[nodeIndex] = c - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - s.nodeEvents[nodeIndex] = eventState - - delete(s.closedNodes, nodeIndex) - - waitForNetworkSetupEvents(s, i) + waitForNetworkSetupEvents(s, nodeIndex) } // If the db was restarted we need to refresh the collection definitions as the old instances @@ -814,11 +766,9 @@ func restartNodes( func refreshCollections( s *state, ) { - s.collections = make([][]client.Collection, len(s.nodes)) - - for nodeID, node := range s.nodes { - s.collections[nodeID] = make([]client.Collection, len(s.collectionNames)) - allCollections, err := node.GetCollections(s.ctx, client.CollectionFetchOptions{}) + for _, node := range s.nodes { + node.collections = make([]client.Collection, len(s.collectionNames)) + allCollections, err := node.client.GetCollections(s.ctx, client.CollectionFetchOptions{}) require.Nil(s.t, err) for i, collectionName := range s.collectionNames { @@ -838,7 +788,7 @@ func refreshCollections( for _, collection := range allCollections { if index, ok := s.collectionIndexesByRoot[collection.Description().RootID]; ok { - s.collections[nodeID][index] = collection + node.collections[index] = collection } } } @@ -864,35 +814,23 @@ func configureNode( netNodeOpts := action() netNodeOpts = append(netNodeOpts, net.WithPrivateKey(privateKey)) - nodeOpts := []node.Option{node.WithDisableP2P(false), db.WithRetryInterval([]time.Duration{time.Millisecond * 1})} + nodeOpts := []node.Option{db.WithRetryInterval([]time.Duration{time.Millisecond * 1})} for _, opt := range netNodeOpts { nodeOpts = append(nodeOpts, opt) } nodeOpts = append(nodeOpts, db.WithNodeIdentity(getIdentity(s, NodeIdentity(len(s.nodes))))) - node, path, err := setupNode(s, nodeOpts...) //disable change detector, or allow it? - require.NoError(s.t, err) - - s.nodeAddresses = append(s.nodeAddresses, node.Peer.PeerInfo()) - s.nodeConfigs = append(s.nodeConfigs, netNodeOpts) - - c, err := setupClient(s, node) + node, err := setupNode(s, nodeOpts...) //disable change detector, or allow it? require.NoError(s.t, err) - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - - s.nodes = append(s.nodes, c) - s.nodeEvents = append(s.nodeEvents, eventState) - s.nodeP2P = append(s.nodeP2P, newP2PState()) - s.dbPaths = append(s.dbPaths, path) + s.nodes = append(s.nodes, node) } func refreshDocuments( s *state, startActionIndex int, ) { - if len(s.collections) == 0 { + if len(s.nodes) == 0 { // This should only be possible at the moment for P2P testing, for which the // change detector is currently disabled. We'll likely need some fancier logic // here if/when we wish to enable it. @@ -902,9 +840,9 @@ func refreshDocuments( // For now just do the initial setup using the collections on the first node, // this may need to become more involved at a later date depending on testing // requirements. - s.docIDs = make([][]client.DocID, len(s.collections[0])) + s.docIDs = make([][]client.DocID, len(s.nodes[0].collections)) - for i := range s.collections[0] { + for i := range s.nodes[0].collections { s.docIDs[i] = []client.DocID{} } @@ -917,7 +855,7 @@ func refreshDocuments( // Just use the collection from the first relevant node, as all will be the same for this // purpose. firstNodesID := nodeIDs[0] - collection := s.collections[firstNodesID][action.CollectionID] + collection := s.nodes[firstNodesID].collections[action.CollectionID] if action.DocMap != nil { substituteRelations(s, action) @@ -939,16 +877,10 @@ func refreshDocuments( func refreshIndexes( s *state, ) { - if len(s.collections) == 0 { - return - } - - s.indexes = make([][][]client.IndexDescription, len(s.collections)) + for _, node := range s.nodes { + node.indexes = make([][]client.IndexDescription, len(node.collections)) - for i, nodeCols := range s.collections { - s.indexes[i] = make([][]client.IndexDescription, len(nodeCols)) - - for j, col := range nodeCols { + for i, col := range node.collections { if col == nil { continue } @@ -957,7 +889,7 @@ func refreshIndexes( continue } - s.indexes[i][j] = colIndexes + node.indexes[i] = colIndexes } } } @@ -966,7 +898,7 @@ func getIndexes( s *state, action GetIndexes, ) { - if len(s.collections) == 0 { + if len(s.nodes) == 0 { return } @@ -974,9 +906,9 @@ func getIndexes( nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) for _, nodeID := range nodeIDs { - collections := s.collections[nodeID] + collections := s.nodes[nodeID].collections err := withRetryOnNode( - s.nodes[nodeID], + s.nodes[nodeID].client, func() error { actualIndexes, err := collections[action.CollectionID].GetIndexes(s.ctx) if err != nil { @@ -1066,7 +998,7 @@ func updateSchema( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - results, err := node.AddSchema(s.ctx, action.Schema) + results, err := node.client.AddSchema(s.ctx, action.Schema) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -1094,7 +1026,7 @@ func patchSchema( setAsDefaultVersion = true } - err := node.PatchSchema(s.ctx, action.Patch, action.Lens, setAsDefaultVersion) + err := node.client.PatchSchema(s.ctx, action.Patch, action.Lens, setAsDefaultVersion) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -1111,7 +1043,7 @@ func patchCollection( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - err := node.PatchCollection(s.ctx, action.Patch) + err := node.client.PatchCollection(s.ctx, action.Patch) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -1132,11 +1064,11 @@ func getSchema( var err error switch { case action.VersionID.HasValue(): - result, e := node.GetSchemaByVersionID(s.ctx, action.VersionID.Value()) + result, e := node.client.GetSchemaByVersionID(s.ctx, action.VersionID.Value()) err = e results = []client.SchemaDescription{result} default: - results, err = node.GetSchemas( + results, err = node.client.GetSchemas( s.ctx, client.SchemaFetchOptions{ Root: action.Root, @@ -1160,9 +1092,9 @@ func getCollections( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - txn := getTransaction(s, node, action.TransactionID, "") + txn := getTransaction(s, node.client, action.TransactionID, "") ctx := db.SetContextTxn(s.ctx, txn) - results, err := node.GetCollections(ctx, action.FilterOptions) + results, err := node.client.GetCollections(ctx, action.FilterOptions) resultDescriptions := make([]client.CollectionDescription, len(results)) for i, col := range results { resultDescriptions[i] = col.Description() @@ -1183,7 +1115,7 @@ func setActiveSchemaVersion( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - err := node.SetActiveSchemaVersion(s.ctx, action.SchemaVersionID) + err := node.client.SetActiveSchemaVersion(s.ctx, action.SchemaVersionID) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -1212,7 +1144,7 @@ func createView( _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - _, err := node.AddView(s.ctx, action.Query, action.SDL, action.Transform) + _, err := node.client.AddView(s.ctx, action.Query, action.SDL, action.Transform) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) @@ -1225,7 +1157,7 @@ func refreshViews( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - err := node.RefreshViews(s.ctx, action.FilterOptions) + err := node.client.RefreshViews(s.ctx, action.FilterOptions) expectedErrorRaised := AssertError(s.t, s.testCase.Description, err, action.ExpectedError) assertExpectedErrorRaised(s.t, s.testCase.Description, action.ExpectedError, expectedErrorRaised) } @@ -1259,15 +1191,15 @@ func createDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] err := withRetryOnNode( - node, + node.client, func() error { var err error docIDs, err = mutation( s, action, - node, + node.client, nodeID, collection, ) @@ -1449,10 +1381,10 @@ func deleteDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeID) err := withRetryOnNode( - node, + node.client, func() error { _, err := collection.Delete(ctx, docID) return err @@ -1493,14 +1425,14 @@ func updateDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] err := withRetryOnNode( - node, + node.client, func() error { return mutation( s, action, - node, + node.client, nodeID, collection, ) @@ -1596,10 +1528,10 @@ func updateWithFilter(s *state, action UpdateWithFilter) { nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeID) err := withRetryOnNode( - node, + node.client, func() error { var err error res, err = collection.UpdateWithFilter(ctx, action.Filter, action.Updater) @@ -1621,18 +1553,10 @@ func createIndex( s *state, action CreateIndex, ) { - if action.CollectionID >= len(s.indexes) { - // Expand the slice if required, so that the index can be accessed by collection index - s.indexes = append( - s.indexes, - make([][][]client.IndexDescription, action.CollectionID-len(s.indexes)+1)..., - ) - } - nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] indexDesc := client.IndexDescription{ Name: action.IndexName, } @@ -1653,14 +1577,14 @@ func createIndex( indexDesc.Unique = action.Unique err := withRetryOnNode( - node, + node.client, func() error { desc, err := collection.CreateIndex(s.ctx, indexDesc) if err != nil { return err } - s.indexes[nodeID][action.CollectionID] = append( - s.indexes[nodeID][action.CollectionID], + s.nodes[nodeID].indexes[action.CollectionID] = append( + s.nodes[nodeID].indexes[action.CollectionID], desc, ) return nil @@ -1684,14 +1608,14 @@ func dropIndex( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] indexName := action.IndexName if indexName == "" { - indexName = s.indexes[nodeID][action.CollectionID][action.IndexID].Name + indexName = s.nodes[nodeID].indexes[action.CollectionID][action.IndexID].Name } err := withRetryOnNode( - node, + node.client, func() error { return collection.DropIndex(s.ctx, indexName) }, @@ -1716,8 +1640,8 @@ func backupExport( _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { err := withRetryOnNode( - node, - func() error { return node.BasicExport(s.ctx, &action.Config) }, + node.client, + func() error { return node.client.BasicExport(s.ctx, &action.Config) }, ) expectedErrorRaised = AssertError(s.t, s.testCase.Description, err, action.ExpectedError) @@ -1747,8 +1671,8 @@ func backupImport( _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { err := withRetryOnNode( - node, - func() error { return node.BasicImport(s.ctx, action.Filepath) }, + node.client, + func() error { return node.client.BasicImport(s.ctx, action.Filepath) }, ) expectedErrorRaised = AssertError(s.t, s.testCase.Description, err, action.ExpectedError) } @@ -1836,7 +1760,7 @@ func executeRequest( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - txn := getTransaction(s, node, action.TransactionID, action.ExpectedError) + txn := getTransaction(s, node.client, action.TransactionID, action.ExpectedError) ctx := getContextWithIdentity(db.SetContextTxn(s.ctx, txn), s, action.Identity, nodeID) @@ -1849,14 +1773,14 @@ func executeRequest( } if !expectedErrorRaised && viewType == MaterializedViewType { - err := node.RefreshViews(s.ctx, client.CollectionFetchOptions{}) + err := node.client.RefreshViews(s.ctx, client.CollectionFetchOptions{}) expectedErrorRaised = AssertError(s.t, s.testCase.Description, err, action.ExpectedError) if expectedErrorRaised { continue } } - result := node.ExecRequest(ctx, action.Request, options...) + result := node.client.ExecRequest(ctx, action.Request, options...) expectedErrorRaised = assertRequestResults( s, @@ -1887,7 +1811,7 @@ func executeSubscriptionRequest( _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - result := node.ExecRequest(s.ctx, action.Request) + result := node.client.ExecRequest(s.ctx, action.Request) if AssertErrors(s.t, s.testCase.Description, result.GQL.Errors, action.ExpectedError) { return } @@ -2144,7 +2068,7 @@ func assertIntrospectionResults( ) bool { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - result := node.ExecRequest(s.ctx, action.Request) + result := node.client.ExecRequest(s.ctx, action.Request) if AssertErrors(s.t, s.testCase.Description, result.GQL.Errors, action.ExpectedError) { return true @@ -2176,7 +2100,7 @@ func assertClientIntrospectionResults( ) bool { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - result := node.ExecRequest(s.ctx, action.Request) + result := node.client.ExecRequest(s.ctx, action.Request) if AssertErrors(s.t, s.testCase.Description, result.GQL.Errors, action.ExpectedError) { return true @@ -2440,7 +2364,7 @@ func performGetNodeIdentityAction(s *state, action GetNodeIdentity) { s.t.Fatalf("invalid nodeID: %v", action.NodeID) } - actualIdent, err := s.nodes[action.NodeID].GetNodeIdentity(s.ctx) + actualIdent, err := s.nodes[action.NodeID].client.GetNodeIdentity(s.ctx) require.NoError(s.t, err, s.testCase.Description) expectedIdent := getIdentity(s, action.ExpectedIdentity)