Skip to content

Commit

Permalink
discovery: Support for joining other peers
Browse files Browse the repository at this point in the history
This will discover other peers using libp2p and dynamically add seeds.

It will also save the docker image used to run the network on IPFS so
other peers can fetch it.

Signed-off-by: Andrea Luzzardi <[email protected]>
  • Loading branch information
aluzzardi committed Dec 5, 2018
1 parent 4d1fe9a commit 8f460c1
Show file tree
Hide file tree
Showing 807 changed files with 285,037 additions and 3,298 deletions.
320 changes: 304 additions & 16 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@
[[override]]
branch = "master"
name = "github.com/dgraph-io/badger"

[[override]]
name = "github.com/tendermint/tendermint"
version = "=0.25.0"
211 changes: 188 additions & 23 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@ package cmd
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/blocklayerhq/chainkit/discovery"
"github.com/blocklayerhq/chainkit/project"
"github.com/blocklayerhq/chainkit/ui"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)

// ExplorerImage defines the container image to pull for running the Cosmos Explorer
Expand Down Expand Up @@ -38,69 +50,222 @@ func init() {
rootCmd.AddCommand(startCmd)
}

func startExplorer(ctx context.Context, p *project.Project) {
func startExplorer(ctx context.Context, p *project.Project) error {
cmd := []string{
"run", "--rm",
"--name", fmt.Sprint(p.Image + "-explorer"),
"-p", fmt.Sprintf("%d:8080", p.Ports.Explorer),
ExplorerImage,
}
if err := docker(ctx, p, cmd...); err != nil {
ui.Fatal("Failed to start the Explorer: %v", err)
return errors.Wrap(err, "failed to start the explorer")
}
return nil
}

func start(p *project.Project, join string) {
ctx, cancel := context.WithCancel(context.Background())
func startServer(ctx context.Context, p *project.Project) error {
if err := dockerRun(ctx, p, "start"); err != nil {
return errors.Wrap(err, "failed to start the application")
}
return nil
}

func start(p *project.Project, chainID string) {
ctx := context.Background()
ui.Info("Starting %s", p.Name)

// Initialize if needed.
if err := initialize(ctx, p); err != nil {
ui.Fatal("Initialization failed: %v", err)
}

s := discovery.New(p.IPFSDir(), p.Ports.IPFS)
if err := s.Start(ctx); err != nil {
d := discovery.New(p.IPFSDir(), p.Ports.IPFS)
if err := d.Start(ctx); err != nil {
ui.Fatal("%v", err)
}
defer s.Stop()
defer d.Stop()

for _, addr := range s.ListenAddresses() {
for _, addr := range d.ListenAddresses() {
ui.Verbose("IPFS Swarm listening on %s", addr)
}

for _, addr := range s.AnnounceAddresses() {
for _, addr := range d.AnnounceAddresses() {
ui.Verbose("IPFS Swarm announcing %s", addr)
}

// Start a network.
if join == "" {
chainID, err := s.Announce(ctx, p.GenesisPath())
if chainID == "" {
f, err := ioutil.TempFile(os.TempDir(), "chainkit-image")
if err != nil {
ui.Fatal("Unable to create temporary file: %v", err)
}
if err := runWithFD(ctx, p, os.Stdin, f, os.Stderr, "docker", "save", p.Image); err != nil {
ui.Fatal("Unable to save image: %v", err)
}
f.Close()

ui.Verbose("Image saved at %s", f.Name())

chainID, err = d.Publish(ctx, p.GenesisPath(), f.Name())
if err != nil {
ui.Fatal("%v", err)
}
ui.Success("Network is live at: %v", chainID)
} else {
ui.Info("Joining network %s", join)
genesisData, peerCh, err := s.Join(ctx, join)
ui.Info("Joining network %s", chainID)
genesis, image, err := d.Join(ctx, chainID)
if err != nil {
ui.Fatal("%v", err)
}
defer genesis.Close()
defer image.Close()

f, err := os.OpenFile(p.GenesisPath(), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
ui.Fatal("unable to overwrite genesis file: %v", err)
}

if _, err := io.Copy(f, genesis); err != nil {
ui.Fatal("unable to write genesis: %v", err)
}

ui.Success("Retrieved genesis data")

if err := ioutil.WriteFile(p.GenesisPath(), genesisData, 0644); err != nil {
ui.Fatal("Unable to write genesis file: %v", err)
if err := runWithFD(ctx, p, image, os.Stdout, os.Stderr, "docker", "load"); err != nil {
ui.Fatal("unable to load image: %v", err)
}
}

cctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
errCh := make(chan error, 2)

// Start the node.
wg.Add(1)
go func() {
defer wg.Done()
errCh <- startServer(cctx, p)
}()

// Start the explorer.
wg.Add(1)
go func() {
defer wg.Done()
errCh <- startExplorer(cctx, p)
}()

go func() {
rpc := client.NewHTTP(
fmt.Sprintf("http://localhost:%d", p.Ports.TendermintRPC),
fmt.Sprintf("http://localhost:%d/websocket", p.Ports.TendermintRPC),
)

peer := <-peerCh
ui.Info("Peer: %v", peer.Addrs)
var (
err error
status *ctypes.ResultStatus
)
// Wait for the node to come up.
for {
status, err = rpc.Status()
if err == nil {
break
}
time.Sleep(200 * time.Millisecond)
}

ui.Info("Node %q is up and running", status.NodeInfo.ID)
ui.Success("Application is live at: %s", ui.Emphasize(fmt.Sprintf("http://localhost:%d/", p.Ports.TendermintRPC)))
ui.Success("Cosmos Explorer is live at: %s", ui.Emphasize(fmt.Sprintf("http://localhost:%d/?rpc_port=%d", p.Ports.Explorer, p.Ports.TendermintRPC)))

peer := &discovery.PeerInfo{
NodeID: string(status.NodeInfo.ID),
TendermintP2PPort: p.Ports.TendermintP2P,
}

// Announce
go func() {
defer wg.Done()
for {
err := d.Announce(ctx, chainID, peer)
if err == nil {
ui.Success("Node successfully announced")
break
}
ui.Error("Failed to announce: %v", err)
time.Sleep(5 * time.Second)
}
}()

// Search Peers
go func() {
defer wg.Done()
for {
peerCh, err := d.SearchPeers(ctx, chainID)
if err != nil {
ui.Fatal("%v", err)
}
nodes := make(map[string]struct{})
for peer := range peerCh {
if _, ok := nodes[peer.NodeID]; ok {
continue
}
ui.Info("Discovered node %q", peer.NodeID)
if err := dialSeeds(ctx, p, peer); err != nil {
ui.Error("Failed to dial peer: %v", err)
continue
}

nodes[peer.NodeID] = struct{}{}
}

time.Sleep(5 * time.Second)
}
}()
}()

// Wait for the application to error out or the user to quit.
c := make(chan os.Signal, 1)
signal.Notify(c,
syscall.SIGINT,
syscall.SIGTERM,
)
select {
case err := <-errCh:
if err != nil {
ui.Error("%v", err)
}
case sig := <-c:
ui.Info("Received signal %v, exiting", sig)
}

ui.Success("Application is live at: %s", ui.Emphasize(fmt.Sprintf("http://localhost:%d/", p.Ports.TendermintRPC)))
ui.Success("Cosmos Explorer is live at: %s", ui.Emphasize(fmt.Sprintf("http://localhost:%d/?rpc_port=%d", p.Ports.Explorer, p.Ports.TendermintRPC)))
defer cancel()
go startExplorer(ctx, p)
if err := dockerRun(ctx, p, "start"); err != nil {
ui.Fatal("Failed to start the application: %v", err)
// Stop all processes and wait for completion.
cancel()
wg.Wait()
}

func dialSeeds(ctx context.Context, p *project.Project, peer *discovery.PeerInfo) error {
seeds := []string{}
for _, ip := range peer.IP {
seeds = append(seeds, fmt.Sprintf("\"%s@%s:%d\"", peer.NodeID, ip, peer.TendermintP2PPort))
}
seedString := fmt.Sprintf("[%s]", strings.Join(seeds, ","))

client := &http.Client{}
req, err := http.NewRequest("GET",
fmt.Sprintf("http://localhost:%d/dial_seeds?seeds=%s",
p.Ports.TendermintRPC,
url.QueryEscape(seedString),
),
nil)
if err != nil {
return err
}
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("requested failed with code %d", resp.StatusCode)
}
return nil
}
41 changes: 36 additions & 5 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package cmd
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/blocklayerhq/chainkit/project"
"github.com/blocklayerhq/chainkit/ui"
Expand Down Expand Up @@ -72,12 +75,40 @@ func docker(ctx context.Context, p *project.Project, args ...string) error {
}

func run(ctx context.Context, p *project.Project, command string, args ...string) error {
return runWithFD(ctx, p, os.Stdin, os.Stdout, os.Stderr, command, args...)
}

func runWithFD(ctx context.Context, p *project.Project, stdin io.Reader, stdout, stderr io.Writer, command string, args ...string) error {
ui.Verbose("$ %s %s", command, strings.Join(args, " "))
cmd := exec.CommandContext(ctx, command)
cmd := exec.Command(command)
cmd.Args = append([]string{command}, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.Dir = p.RootDir
return cmd.Run()

if err := cmd.Start(); err != nil {
return err
}

// We don't use exec.CommandContext here because it will
// SIGKILL the process. Instead, we handle the context
// on our own and try to gracefully shutdown the command.
waitDone := make(chan struct{})
go func() {
select {
case <-ctx.Done():
cmd.Process.Signal(syscall.SIGTERM)
select {
case <-time.After(5 * time.Second):
cmd.Process.Kill()
case <-waitDone:
}
case <-waitDone:
}
}()

err := cmd.Wait()
close(waitDone)
return err
}
Loading

0 comments on commit 8f460c1

Please sign in to comment.