Skip to content

Commit

Permalink
refactor: P2P client interface (sourcenetwork#1924)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#1883 

## Description

Blocked by sourcenetwork#1927 

This PR moves the `client.P2P` implementation from `client.DB` to
`net.Node`. This fixes the problems mentioned in the issue above and
should increase test coverage of the HTTP and CLI clients.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Oct 16, 2023
1 parent 5c1b21e commit 25d4767
Show file tree
Hide file tree
Showing 55 changed files with 1,418 additions and 1,772 deletions.
50 changes: 36 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,57 +244,73 @@ When starting a node for the first time, a key pair is generated and stored in i

Each node has a unique `PeerID` generated from its public key. This ID allows other nodes to connect to it.

To view your node's peer info:

```shell
defradb client p2p info
```

There are two types of peer-to-peer relationships supported: **pubsub** peering and **replicator** peering.

Pubsub peering *passively* synchronizes data between nodes by broadcasting *Document Commit* updates to the topic of the commit's document key. Nodes need to be listening on the pubsub channel to receive updates. This is for when two nodes *already* have share a document and want to keep them in sync.

Replicator peering *actively* pushes changes from a specific collection *to* a target peer.

### Pubsub example
<details>
<summary>Pubsub example</summary>

Pubsub peers can be specified on the command line using the `--peers` flag, which accepts a comma-separated list of peer [multiaddresses](https://docs.libp2p.io/concepts/addressing/). For example, a node at IP `192.168.1.12` listening on 9000 with PeerID `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B` would be referred to using the multiaddress `/ip4/192.168.1.12/tcp/9000/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`.

Let's go through an example of two nodes (*nodeA* and *nodeB*) connecting with each other over pubsub, on the same machine.

Start *nodeA* with a default configuration:

```
```shell
defradb start
```

Obtain the PeerID from its console output. In this example, we use `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`, but locally it will be different.
Obtain the node's peer info:

```shell
defradb client p2p info
```

In this example, we use `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`, but locally it will be different.

For *nodeB*, we provide the following configuration:

```
defradb start --rootdir ~/.defradb-nodeB --url localhost:9182 --p2paddr /ip4/0.0.0.0/tcp/9172 --tcpaddr /ip4/0.0.0.0/tcp/9162 --peers /ip4/0.0.0.0/tcp/9171/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B
```shell
defradb start --rootdir ~/.defradb-nodeB --url localhost:9182 --p2paddr /ip4/0.0.0.0/tcp/9172 --peers /ip4/0.0.0.0/tcp/9171/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B
```

About the flags:

- `--rootdir` specifies the root dir (config and data) to use
- `--url` is the address to listen on for the client HTTP and GraphQL API
- `--p2paddr` is the multiaddress for the P2P networking to listen on
- `--tcpaddr` is the multiaddress for the gRPC server to listen on
- `--peers` is a comma-separated list of peer multiaddresses

This starts two nodes and connects them via pubsub networking.
</details>

### Collection subscription example
<details>
<summary>Subscription example</summary>

It is possible to subscribe to updates on a given collection by using its ID as the pubsub topic. The ID of a collection is found as the field `collectionID` in one of its documents. Here we use the collection ID of the `User` type we created above. After setting up 2 nodes as shown in the [Pubsub example](#pubsub-example) section, we can subscribe to collections updates on *nodeA* from *nodeB* by using the `rpc p2pcollection` command:
It is possible to subscribe to updates on a given collection by using its ID as the pubsub topic. The ID of a collection is found as the field `collectionID` in one of its documents. Here we use the collection ID of the `User` type we created above. After setting up 2 nodes as shown in the [Pubsub example](#pubsub-example) section, we can subscribe to collections updates on *nodeA* from *nodeB* by using the following command:

```shell
defradb client rpc p2pcollection add --url localhost:9182 bafkreibpnvkvjqvg4skzlijka5xe63zeu74ivcjwd76q7yi65jdhwqhske
defradb client p2p collection add --url localhost:9182 bafkreibpnvkvjqvg4skzlijka5xe63zeu74ivcjwd76q7yi65jdhwqhske
```

Multiple collection IDs can be added at once.

```shell
defradb client rpc p2pcollection add --url localhost:9182 <collection1ID> <collection2ID> <collection3ID>
defradb client p2p collection add --url localhost:9182 <collection1ID>,<collection2ID>,<collection3ID>
```
</details>

### Replicator example
<details>
<summary>Replicator example</summary>

Replicator peering is targeted: it allows a node to actively send updates to another node. Let's go through an example of *nodeA* actively replicating to *nodeB*:

Expand Down Expand Up @@ -334,14 +350,20 @@ defradb client schema add --url localhost:9182 '
'
```

Set *nodeA* to actively replicate the "Article" collection to *nodeB*:
Then copy the peer info from *nodeB*:

```shell
defradb client rpc replicator set -c "Article" /ip4/0.0.0.0/tcp/9172/p2p/<peerID_of_nodeB>
defradb client p2p info --url localhost:9182
```

As we add or update documents in the "Article" collection on *nodeA*, they will be actively pushed to *nodeB*. Note that changes to *nodeB* will still be passively published back to *nodeA*, via pubsub.
Set *nodeA* to actively replicate the Article collection to *nodeB*:

```shell
defradb client p2p replicator set -c Article <nodeB_peer_info_json>
```

As we add or update documents in the Article collection on *nodeA*, they will be actively pushed to *nodeB*. Note that changes to *nodeB* will still be passively published back to *nodeA*, via pubsub.
</details>

## Securing the HTTP API with TLS

Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Example: add multiple collections
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
Expand All @@ -42,7 +42,7 @@ Example: add multiple collections
collectionIDs = append(collectionIDs, id)
}

return store.AddP2PCollections(cmd.Context(), collectionIDs)
return p2p.AddP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func MakeP2PCollectionGetAllCommand() *cobra.Command {
This is the list of collections of the node that are synchronized on the pubsub network.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

cols, err := store.GetAllP2PCollections(cmd.Context())
cols, err := p2p.GetAllP2PCollections(cmd.Context())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cli/p2p_collection_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Example: remove multiple collections
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
Expand All @@ -42,7 +42,7 @@ Example: remove multiple collections
collectionIDs = append(collectionIDs, id)
}

return store.RemoveP2PCollections(cmd.Context(), collectionIDs)
return p2p.RemoveP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
7 changes: 1 addition & 6 deletions cli/p2p_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ func MakeP2PInfoCommand() *cobra.Command {
Long: `Get peer info from a DefraDB node`,
RunE: func(cmd *cobra.Command, args []string) error {
db := cmd.Context().Value(dbContextKey).(*http.Client)

res, err := db.PeerInfo(cmd.Context())
if err != nil {
return err
}
return writeJSON(cmd, res)
return writeJSON(cmd, db.PeerInfo())
},
}
return cmd
Expand Down
30 changes: 22 additions & 8 deletions cli/p2p_replicator_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,41 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
)

func MakeP2PReplicatorDeleteCommand() *cobra.Command {
var collections []string
var cmd = &cobra.Command{
Use: "delete <peer>",
Short: "Delete a replicator. It will stop synchronizing",
Long: `Delete a replicator. It will stop synchronizing.`,
Args: cobra.ExactArgs(1),
Use: "delete [-c, --collection] <peer>",
Short: "Delete replicator(s) and stop synchronization",
Long: `Delete replicator(s) and stop synchronization.
A replicator synchronizes one or all collection(s) from this node to another.
Example:
defradb client p2p replicator delete -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
return store.DeleteReplicator(cmd.Context(), client.Replicator{Info: *addr})
rep := client.Replicator{
Info: info,
Schemas: collections,
}
return p2p.DeleteReplicator(cmd.Context(), rep)
},
}
cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Collection(s) to stop replicating")
return cmd
}
10 changes: 7 additions & 3 deletions cli/p2p_replicator_getall.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ func MakeP2PReplicatorGetAllCommand() *cobra.Command {
Use: "getall",
Short: "Get all replicators",
Long: `Get all the replicators active in the P2P data sync system.
These are the replicators that are currently replicating data from one node to another.`,
A replicator synchronizes one or all collection(s) from this node to another.
Example:
defradb client p2p replicator getall
`,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

reps, err := store.GetAllReplicators(cmd.Context())
reps, err := p2p.GetAllReplicators(cmd.Context())
if err != nil {
return err
}
Expand Down
23 changes: 14 additions & 9 deletions cli/p2p_replicator_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package cli

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

Expand All @@ -21,27 +23,30 @@ func MakeP2PReplicatorSetCommand() *cobra.Command {
var collections []string
var cmd = &cobra.Command{
Use: "set [-c, --collection] <peer>",
Short: "Set a P2P replicator",
Long: `Add a new target replicator.
A replicator replicates one or all collection(s) from this node to another.
Short: "Add replicator(s) and start synchronization",
Long: `Add replicator(s) and start synchronization.
A replicator synchronizes one or all collection(s) from this node to another.
Example:
defradb client p2p replicator set -c Users '{"ID": "12D3", "Addrs": ["/ip4/0.0.0.0/tcp/9171"]}'
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
p2p := mustGetP2PContext(cmd)

addr, err := peer.AddrInfoFromString(args[0])
if err != nil {
var info peer.AddrInfo
if err := json.Unmarshal([]byte(args[0]), &info); err != nil {
return err
}
rep := client.Replicator{
Info: *addr,
Info: info,
Schemas: collections,
}
return store.SetReplicator(cmd.Context(), rep)
return p2p.SetReplicator(cmd.Context(), rep)
},
}

cmd.Flags().StringSliceVarP(&collections, "collection", "c",
[]string{}, "Define the collection for the replicator")
[]string{}, "Collection(s) to replicate")
return cmd
}
Loading

0 comments on commit 25d4767

Please sign in to comment.