Skip to content

Commit

Permalink
refactor: Remove net GRPC API (sourcenetwork#1927)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

N/A

## Description

This PR removes the GRPC API from the net package. The HTTP and CLI
interfaces now include this functionality.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Oct 10, 2023
1 parent ce7d778 commit bc4c704
Show file tree
Hide file tree
Showing 34 changed files with 760 additions and 4,979 deletions.
25 changes: 22 additions & 3 deletions cli/p2p_collection_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,38 @@
package cli

import (
"strings"

"github.com/spf13/cobra"
)

func MakeP2PCollectionAddCommand() *cobra.Command {
var cmd = &cobra.Command{
Use: "add [collectionID]",
Use: "add [collectionIDs]",
Short: "Add P2P collections",
Long: `Add P2P collections to the synchronized pubsub topics.
The collections are synchronized between nodes of a pubsub network.`,
The collections are synchronized between nodes of a pubsub network.
Example: add single collection
defradb client p2p collection add bae123
Example: add multiple collections
defradb client p2p collection add bae123,bae456
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
return store.AddP2PCollection(cmd.Context(), args[0])

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
id = strings.TrimSpace(id)
if id == "" {
continue
}
collectionIDs = append(collectionIDs, id)
}

return store.AddP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
25 changes: 22 additions & 3 deletions cli/p2p_collection_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,38 @@
package cli

import (
"strings"

"github.com/spf13/cobra"
)

func MakeP2PCollectionRemoveCommand() *cobra.Command {
var cmd = &cobra.Command{
Use: "remove [collectionID]",
Use: "remove [collectionIDs]",
Short: "Remove P2P collections",
Long: `Remove P2P collections from the followed pubsub topics.
The removed collections will no longer be synchronized between nodes.`,
The removed collections will no longer be synchronized between nodes.
Example: remove single collection
defradb client p2p collection remove bae123
Example: remove multiple collections
defradb client p2p collection remove bae123,bae456
`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetStoreContext(cmd)
return store.RemoveP2PCollection(cmd.Context(), args[0])

var collectionIDs []string
for _, id := range strings.Split(args[0], ",") {
id = strings.TrimSpace(id)
if id == "" {
continue
}
collectionIDs = append(collectionIDs, id)
}

return store.RemoveP2PCollections(cmd.Context(), collectionIDs)
},
}
return cmd
Expand Down
57 changes: 1 addition & 56 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package cli
import (
"context"
"fmt"
gonet "net"
"net/http"
"os"
"os/signal"
Expand All @@ -22,12 +21,7 @@ import (
"syscall"

badger "github.com/dgraph-io/badger/v4"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
ma "github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/config"
Expand All @@ -38,7 +32,6 @@ import (
httpapi "github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/net"
netpb "github.com/sourcenetwork/defradb/net/pb"
netutils "github.com/sourcenetwork/defradb/net/utils"
)

Expand Down Expand Up @@ -114,15 +107,6 @@ func MakeStartCommand(cfg *config.Config) *cobra.Command {
log.FeedbackFatalE(context.Background(), "Could not bind net.p2paddress", err)
}

cmd.Flags().String(
"tcpaddr", cfg.Net.TCPAddress,
"Listener address for the tcp gRPC server (formatted as a libp2p MultiAddr)",
)
err = cfg.BindFlag("net.tcpaddress", cmd.Flags().Lookup("tcpaddr"))
if err != nil {
log.FeedbackFatalE(context.Background(), "Could not bind net.tcpaddress", err)
}

cmd.Flags().Bool(
"no-p2p", cfg.Net.P2PDisabled,
"Disable the peer-to-peer network synchronization system",
Expand Down Expand Up @@ -269,7 +253,7 @@ func start(ctx context.Context, cfg *config.Config) (*defraInstance, error) {
return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
n.Boostrap(addrs)
n.Bootstrap(addrs)
}

if err := n.Start(); err != nil {
Expand All @@ -279,45 +263,6 @@ func start(ctx context.Context, cfg *config.Config) (*defraInstance, error) {
db.Close(ctx)
return nil, errors.Wrap("failed to start P2P listeners", err)
}

MtcpAddr, err := ma.NewMultiaddr(cfg.Net.TCPAddress)
if err != nil {
return nil, errors.Wrap("failed to parse multiaddress", err)
}
addr, err := netutils.TCPAddrFromMultiAddr(MtcpAddr)
if err != nil {
return nil, errors.Wrap("failed to parse TCP address", err)
}

rpcTimeoutDuration, err := cfg.Net.RPCTimeoutDuration()
if err != nil {
return nil, errors.Wrap("failed to parse RPC timeout duration", err)
}

server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
grpc_recovery.UnaryServerInterceptor(),
),
),
grpc.KeepaliveParams(
keepalive.ServerParameters{
MaxConnectionIdle: rpcTimeoutDuration,
},
),
)
tcplistener, err := gonet.Listen("tcp", addr)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to listen on TCP address %v", addr), err)
}

go func() {
log.FeedbackInfo(ctx, "Started RPC server", logging.NewKV("Address", addr))
netpb.RegisterCollectionServer(server, n.Peer)
if err := server.Serve(tcplistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
log.FeedbackFatalE(ctx, "Failed to start RPC server", err)
}
}()
}

sOpt := []func(*httpapi.Server){
Expand Down
16 changes: 8 additions & 8 deletions client/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ type P2P interface {
// subscribed schemas.
GetAllReplicators(ctx context.Context) ([]Replicator, error)

// AddP2PCollection adds the given collection ID that the P2P system
// subscribes to to the the persisted list. It will error if the provided
// collection ID is invalid.
AddP2PCollection(ctx context.Context, collectionID string) error
// AddP2PCollections adds the given collection IDs to the P2P system and
// subscribes to their topics. It will error if any of the provided
// collection IDs are invalid.
AddP2PCollections(ctx context.Context, collectionIDs []string) error

// RemoveP2PCollection removes the given collection ID that the P2P system
// subscribes to from the the persisted list. It will error if the provided
// collection ID is invalid.
RemoveP2PCollection(ctx context.Context, collectionID string) error
// RemoveP2PCollections removes the given collection IDs from the P2P system and
// unsubscribes from their topics. It will error if the provided
// collection IDs are invalid.
RemoveP2PCollections(ctx context.Context, collectionIDs []string) error

// GetAllP2PCollections returns the list of persisted collection IDs that
// the P2P system subscribes to.
Expand Down
61 changes: 11 additions & 50 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"strconv"
"strings"
"text/template"
"time"

"github.com/mitchellh/mapstructure"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -350,48 +349,28 @@ func (apicfg *APIConfig) AddressToURL() string {

// NetConfig configures aspects of network and peer-to-peer.
type NetConfig struct {
P2PAddress string
P2PDisabled bool
Peers string
PubSubEnabled bool `mapstructure:"pubsub"`
RelayEnabled bool `mapstructure:"relay"`
RPCAddress string
RPCMaxConnectionIdle string
RPCTimeout string
TCPAddress string
P2PAddress string
P2PDisabled bool
Peers string
PubSubEnabled bool `mapstructure:"pubsub"`
RelayEnabled bool `mapstructure:"relay"`
}

func defaultNetConfig() *NetConfig {
return &NetConfig{
P2PAddress: "/ip4/0.0.0.0/tcp/9171",
P2PDisabled: false,
Peers: "",
PubSubEnabled: true,
RelayEnabled: false,
RPCAddress: "0.0.0.0:9161",
RPCMaxConnectionIdle: "5m",
RPCTimeout: "10s",
TCPAddress: "/ip4/0.0.0.0/tcp/9161",
P2PAddress: "/ip4/0.0.0.0/tcp/9171",
P2PDisabled: false,
Peers: "",
PubSubEnabled: true,
RelayEnabled: false,
}
}

func (netcfg *NetConfig) validate() error {
_, err := time.ParseDuration(netcfg.RPCTimeout)
if err != nil {
return NewErrInvalidRPCTimeout(err, netcfg.RPCTimeout)
}
_, err = time.ParseDuration(netcfg.RPCMaxConnectionIdle)
if err != nil {
return NewErrInvalidRPCMaxConnectionIdle(err, netcfg.RPCMaxConnectionIdle)
}
_, err = ma.NewMultiaddr(netcfg.P2PAddress)
_, err := ma.NewMultiaddr(netcfg.P2PAddress)
if err != nil {
return NewErrInvalidP2PAddress(err, netcfg.P2PAddress)
}
_, err = net.ResolveTCPAddr("tcp", netcfg.RPCAddress)
if err != nil {
return NewErrInvalidRPCAddress(err, netcfg.RPCAddress)
}
if len(netcfg.Peers) > 0 {
peers := strings.Split(netcfg.Peers, ",")
maddrs := make([]ma.Multiaddr, len(peers))
Expand All @@ -405,24 +384,6 @@ func (netcfg *NetConfig) validate() error {
return nil
}

// RPCTimeoutDuration gives the RPC timeout as a time.Duration.
func (netcfg *NetConfig) RPCTimeoutDuration() (time.Duration, error) {
d, err := time.ParseDuration(netcfg.RPCTimeout)
if err != nil {
return d, NewErrInvalidRPCTimeout(err, netcfg.RPCTimeout)
}
return d, nil
}

// RPCMaxConnectionIdleDuration gives the RPC MaxConnectionIdle as a time.Duration.
func (netcfg *NetConfig) RPCMaxConnectionIdleDuration() (time.Duration, error) {
d, err := time.ParseDuration(netcfg.RPCMaxConnectionIdle)
if err != nil {
return d, NewErrInvalidRPCMaxConnectionIdle(err, netcfg.RPCMaxConnectionIdle)
}
return d, nil
}

// LogConfig configures output and logger.
type LoggingConfig struct {
Level string
Expand Down
52 changes: 0 additions & 52 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -26,8 +25,6 @@ var envVarsDifferent = map[string]string{
"DEFRA_API_ADDRESS": "localhost:9999",
"DEFRA_NET_P2PDISABLED": "true",
"DEFRA_NET_P2PADDRESS": "/ip4/0.0.0.0/tcp/9876",
"DEFRA_NET_RPCADDRESS": "localhost:7777",
"DEFRA_NET_RPCTIMEOUT": "90s",
"DEFRA_NET_PUBSUB": "false",
"DEFRA_NET_RELAY": "false",
"DEFRA_LOG_LEVEL": "error",
Expand All @@ -41,8 +38,6 @@ var envVarsInvalid = map[string]string{
"DEFRA_API_ADDRESS": "^=+()&**()*(&))",
"DEFRA_NET_P2PDISABLED": "^=+()&**()*(&))",
"DEFRA_NET_P2PADDRESS": "^=+()&**()*(&))",
"DEFRA_NET_RPCADDRESS": "^=+()&**()*(&))",
"DEFRA_NET_RPCTIMEOUT": "^=+()&**()*(&))",
"DEFRA_NET_PUBSUB": "^=+()&**()*(&))",
"DEFRA_NET_RELAY": "^=+()&**()*(&))",
"DEFRA_LOG_LEVEL": "^=+()&**()*(&))",
Expand Down Expand Up @@ -178,8 +173,6 @@ func TestEnvVariablesAllConsidered(t *testing.T) {
assert.Equal(t, "memory", cfg.Datastore.Store)
assert.Equal(t, true, cfg.Net.P2PDisabled)
assert.Equal(t, "/ip4/0.0.0.0/tcp/9876", cfg.Net.P2PAddress)
assert.Equal(t, "localhost:7777", cfg.Net.RPCAddress)
assert.Equal(t, "90s", cfg.Net.RPCTimeout)
assert.Equal(t, false, cfg.Net.PubSubEnabled)
assert.Equal(t, false, cfg.Net.RelayEnabled)
assert.Equal(t, "error", cfg.Log.Level)
Expand Down Expand Up @@ -390,51 +383,6 @@ func TestValidationInvalidNetConfigPeers(t *testing.T) {
assert.ErrorIs(t, err, ErrFailedToValidateConfig)
}

func TestValidationInvalidRPCMaxConnectionIdle(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCMaxConnectionIdle = "123123"
err := cfg.validate()
assert.ErrorIs(t, err, ErrFailedToValidateConfig)
}

func TestValidationInvalidRPCTimeout(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCTimeout = "123123"
err := cfg.validate()
assert.ErrorIs(t, err, ErrFailedToValidateConfig)
}

func TestValidationRPCTimeoutDuration(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCTimeout = "1s"
err := cfg.validate()
assert.NoError(t, err)
}

func TestValidationInvalidRPCTimeoutDuration(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCTimeout = "123123"
err := cfg.validate()
assert.ErrorIs(t, err, ErrInvalidRPCTimeout)
}

func TestValidationRPCMaxConnectionIdleDuration(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCMaxConnectionIdle = "1s"
err := cfg.validate()
assert.NoError(t, err)
duration, err := cfg.Net.RPCMaxConnectionIdleDuration()
assert.NoError(t, err)
assert.Equal(t, duration, 1*time.Second)
}

func TestValidationInvalidMaxConnectionIdleDuration(t *testing.T) {
cfg := DefaultConfig()
cfg.Net.RPCMaxConnectionIdle = "*ˆ&%*&%"
err := cfg.validate()
assert.ErrorIs(t, err, ErrInvalidRPCMaxConnectionIdle)
}

func TestValidationInvalidLoggingConfig(t *testing.T) {
cfg := DefaultConfig()
cfg.Log.Level = "546578"
Expand Down
Loading

0 comments on commit bc4c704

Please sign in to comment.