Skip to content

Commit

Permalink
change cli start flow
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Aug 5, 2022
1 parent 51c4026 commit 100d6d9
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 139 deletions.
15 changes: 13 additions & 2 deletions cli/peerid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package cli

import (
"bytes"
"context"
"encoding/json"
"io"
"testing"
Expand All @@ -20,11 +21,21 @@ import (
)

func TestGetPeerIDCmd(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cfg.Datastore.Store = "memory"
cfg.Datastore.Badger.Path = dir
n, err := start(ctx)
if err != nil {
t.Fatal(err)
}

b := bytes.NewBufferString("")
rootCmd.SetOut(b)

rootCmd.SetArgs([]string{"client", "peerid"})

err := rootCmd.Execute()
err = rootCmd.Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -45,5 +56,5 @@ func TestGetPeerIDCmd(t *testing.T) {
t.Fatal(err)
}

assert.NotEmpty(t, r.Data.PeerID)
assert.Equal(t, n.PeerID().String(), r.Data.PeerID)
}
291 changes: 154 additions & 137 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,147 +71,13 @@ var startCmd = &cobra.Command{
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
log.FeedbackInfo(ctx, "Starting DefraDB service...")

// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

var rootstore ds.Batching

var err error
if cfg.Datastore.Store == badgerDatastoreName {
log.FeedbackInfo(
cmd.Context(),
"Opening badger store",
logging.NewKV("Path", cfg.Datastore.Badger.Path),
)
rootstore, err = badgerds.NewDatastore(
cfg.Datastore.Badger.Path,
cfg.Datastore.Badger.Options,
)
} else if cfg.Datastore.Store == "memory" {
log.FeedbackInfo(cmd.Context(), "Building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
}

if err != nil {
return fmt.Errorf("failed to open datastore: %w", err)
}

var options []db.Option

// check for p2p
var bs *broadcast.Broadcaster
if !cfg.Net.P2PDisabled {
bs = broadcast.NewBroadcaster(busBufferSize)
options = append(options, db.WithBroadcaster(bs))
}

db, err := db.NewDB(cmd.Context(), rootstore, options...)
n, err := start(cmd.Context())
if err != nil {
return fmt.Errorf("failed to create database: %w", err)
}

// init the p2p node
var n *node.Node
if !cfg.Net.P2PDisabled {
log.FeedbackInfo(cmd.Context(), "Starting P2P node", logging.NewKV("P2P address", cfg.Net.P2PAddress))
n, err = node.NewNode(
cmd.Context(),
db,
bs,
cfg.NodeConfig(),
)
if err != nil {
n.Close() //nolint:errcheck
db.Close(cmd.Context())
return fmt.Errorf("failed to start P2P node: %w", err)
}

// parse peers and bootstrap
if len(cfg.Net.Peers) != 0 {
log.Debug(cmd.Context(), "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers))
addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ","))
if err != nil {
return fmt.Errorf("failed to parse bootstrap peers %v: %w", cfg.Net.Peers, err)
}
log.Debug(cmd.Context(), "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
n.Boostrap(addrs)
}

if err := n.Start(); err != nil {
n.Close() //nolint:errcheck
db.Close(cmd.Context())
return fmt.Errorf("failed to start P2P listeners: %w", err)
}

MtcpAddr, err := ma.NewMultiaddr(cfg.Net.TCPAddress)
if err != nil {
return fmt.Errorf("failed to parse multiaddress: %w", err)
}
addr, err := netutils.TCPAddrFromMultiAddr(MtcpAddr)
if err != nil {
return fmt.Errorf("failed to parse TCP address: %w", err)
}

rpcTimeoutDuration, err := cfg.Net.RPCTimeoutDuration()
if err != nil {
return fmt.Errorf("failed to parse RPC timeout duration: %w", err)
}

server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: rpcTimeoutDuration,
}))
tcplistener, err := gonet.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("failed to listen on TCP address %v: %w", addr, err)
}

netService := netapi.NewService(n.Peer)

go func() {
log.FeedbackInfo(cmd.Context(), "Started RPC server", logging.NewKV("Address", addr))
netpb.RegisterServiceServer(server, netService)
if err := server.Serve(tcplistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
log.FeedbackFatalE(cmd.Context(), "failed to start RPC server", err)
}
}()
return err
}

// run the server listener in a separate goroutine
go func() {
log.FeedbackInfo(
cmd.Context(),
fmt.Sprintf(
"Providing HTTP API at %s%s. Use the GraphQL query endpoint at %s%s/graphql ",
cfg.API.AddressToURL(),
httpapi.RootPath,
cfg.API.AddressToURL(),
httpapi.RootPath,
),
)
s := http.NewServer(db, http.WithAddress(cfg.API.Address), http.WithPeerID(n.PeerID().String()))
if err := s.Listen(); err != nil {
log.FeedbackErrorE(cmd.Context(), "Failed to start HTTP API listener", err)
if n != nil {
n.Close() //nolint:errcheck
}
db.Close(cmd.Context())
os.Exit(1)
}
}()
wait(cmd.Context(), n)

// wait for shutdown signal
<-signalCh
log.FeedbackInfo(cmd.Context(), "Received interrupt; closing database...")
if n != nil {
n.Close() //nolint:errcheck
}
db.Close(cmd.Context())
os.Exit(0)
return nil
},
}
Expand Down Expand Up @@ -264,3 +130,154 @@ func init() {

rootCmd.AddCommand(startCmd)
}

func start(ctx context.Context) (*node.Node, error) {
log.FeedbackInfo(ctx, "Starting DefraDB service...")

var rootstore ds.Batching

var err error
if cfg.Datastore.Store == badgerDatastoreName {
log.FeedbackInfo(
ctx,
"Opening badger store",
logging.NewKV("Path", cfg.Datastore.Badger.Path),
)
rootstore, err = badgerds.NewDatastore(
cfg.Datastore.Badger.Path,
cfg.Datastore.Badger.Options,
)
} else if cfg.Datastore.Store == "memory" {
log.FeedbackInfo(ctx, "Building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
}

if err != nil {
return nil, fmt.Errorf("failed to open datastore: %w", err)
}

var options []db.Option

// check for p2p
var bs *broadcast.Broadcaster
if !cfg.Net.P2PDisabled {
bs = broadcast.NewBroadcaster(busBufferSize)
options = append(options, db.WithBroadcaster(bs))
}

db, err := db.NewDB(ctx, rootstore, options...)
if err != nil {
return nil, fmt.Errorf("failed to create database: %w", err)
}

// init the p2p node
var n *node.Node
if !cfg.Net.P2PDisabled {
log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P address", cfg.Net.P2PAddress))
n, err = node.NewNode(
ctx,
db,
bs,
cfg.NodeConfig(),
)
if err != nil {
n.Close() //nolint:errcheck
db.Close(ctx)
return nil, fmt.Errorf("failed to start P2P node: %w", err)
}

// parse peers and bootstrap
if len(cfg.Net.Peers) != 0 {
log.Debug(ctx, "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers))
addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ","))
if err != nil {
return nil, fmt.Errorf("failed to parse bootstrap peers %v: %w", cfg.Net.Peers, err)
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
n.Boostrap(addrs)
}

if err := n.Start(); err != nil {
n.Close() //nolint:errcheck
db.Close(ctx)
return nil, fmt.Errorf("failed to start P2P listeners: %w", err)
}

MtcpAddr, err := ma.NewMultiaddr(cfg.Net.TCPAddress)
if err != nil {
return nil, fmt.Errorf("failed to parse multiaddress: %w", err)
}
addr, err := netutils.TCPAddrFromMultiAddr(MtcpAddr)
if err != nil {
return nil, fmt.Errorf("failed to parse TCP address: %w", err)
}

rpcTimeoutDuration, err := cfg.Net.RPCTimeoutDuration()
if err != nil {
return nil, fmt.Errorf("failed to parse RPC timeout duration: %w", err)
}

server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: rpcTimeoutDuration,
}))
tcplistener, err := gonet.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on TCP address %v: %w", addr, err)
}

netService := netapi.NewService(n.Peer)

go func() {
log.FeedbackInfo(ctx, "Started RPC server", logging.NewKV("Address", addr))
netpb.RegisterServiceServer(server, netService)
if err := server.Serve(tcplistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
log.FeedbackFatalE(ctx, "failed to start RPC server", err)
}
}()
}

s := http.NewServer(db, http.WithAddress(cfg.API.Address), http.WithPeerID(n.PeerID().String()))
if err := s.Listen(ctx); err != nil {
return nil, fmt.Errorf("failed to listen on TCP address %v: %w", s.Addr, err)
}

// run the server in a separate goroutine
go func() {
log.FeedbackInfo(
ctx,
fmt.Sprintf(
"Providing HTTP API at %s%s. Use the GraphQL query endpoint at %s%s/graphql ",
cfg.API.AddressToURL(),
httpapi.RootPath,
cfg.API.AddressToURL(),
httpapi.RootPath,
),
)
if err := s.Run(); err != nil {
log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err)
if n != nil {
n.Close() //nolint:errcheck
}
db.Close(ctx)
os.Exit(1)
}
}()

return n, nil
}

// wait waits for an interrupt signal to close the program.
func wait(ctx context.Context, n *node.Node) {
// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

<-signalCh
log.FeedbackInfo(ctx, "Received interrupt; closing database...")
if n != nil {
n.Close() //nolint:errcheck
}
n.DB.Close(ctx)
os.Exit(0)
}

0 comments on commit 100d6d9

Please sign in to comment.