Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: discover peers from cjdns #1316

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ func toPeerInfos(bpeers []config.BootstrapPeer) []peer.PeerInfo {
}

func toPeerInfo(bp config.BootstrapPeer) peer.PeerInfo {
// for now, we drop the "ipfs addr" part of the multiaddr. the rest
// of the codebase currently uses addresses without the peerid part.
m := bp.Multiaddr()
s := ma.Split(m)
m = ma.Join(s[:len(s)-1]...)
Expand Down
2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if err != nil {
return err
}
do := setupDiscoveryOption(rcfg.Discovery)
do := setupDiscoveryOptions(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do); err != nil {
return err
}
Expand Down
28 changes: 18 additions & 10 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type IpfsNode struct {
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
Discovery []discovery.Service

// Online
PeerHost p2phost.Host // the network host (server+client)
Expand Down Expand Up @@ -124,7 +124,7 @@ type Mounts struct {
Ipns mount.Mount
}

func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, discoveryOpts []DiscoveryOption) error {

if n.PeerHost != nil { // already online.
return errors.New("node already online")
Expand Down Expand Up @@ -170,29 +170,37 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)

// setup local discovery
if do != nil {
service, err := do(n.PeerHost)
if err != nil {
log.Error("mdns error: ", err)
for _, opt := range discoveryOpts {
if service, err := opt(ctx, n.PeerHost); err != nil {
return err
} else {
service.RegisterNotifee(n)
n.Discovery = service
n.Discovery = append(n.Discovery, service)
}
}

return n.Bootstrap(DefaultBootstrapConfig)
}

func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
func setupDiscoveryOptions(d config.Discovery) []DiscoveryOption {
opts := []DiscoveryOption{}

if d.MDNS.Enabled {
return func(h p2phost.Host) (discovery.Service, error) {
opt := func(_ context.Context, h p2phost.Host) (discovery.Service, error) {
if d.MDNS.Interval == 0 {
d.MDNS.Interval = 5
}
return discovery.NewMdnsService(h, time.Duration(d.MDNS.Interval)*time.Second)
}
opts = append(opts, opt)
}
return nil
if d.Cjdns.Enabled {
opt := func(ctx, context.Context, h p2phost.Host) (discovery.Service, error) {
return discovery.NewCjdnsService(ctx, h, d.Cjdns)
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupDiscoveryOption will need a refactoring. Right now it can only use either MDNS or Cjdns. This is also a bad place for the default interval.

opts = append(opts, opt)
}
return opts
}

func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) {
Expand Down
2 changes: 2 additions & 0 deletions p2p/crypto/secio/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package secio

import (
"fmt"
"io"

ci "github.com/ipfs/go-ipfs/p2p/crypto"
Expand Down Expand Up @@ -68,6 +69,7 @@ func (s *secureSession) LocalPrivateKey() ci.PrivKey {
// RemotePeer retrieves the remote peer.
func (s *secureSession) RemotePeer() peer.ID {
if err := s.Handshake(); err != nil {
fmt.Printf("handshake error: %s\n", err)
return ""
}
return s.remotePeer
Expand Down
210 changes: 210 additions & 0 deletions p2p/discovery/cjdns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package discovery

import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
host "github.com/ipfs/go-ipfs/p2p/host"
swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
peer "github.com/ipfs/go-ipfs/p2p/peer"
// TODO: kill this dependency
config "github.com/ipfs/go-ipfs/repo/config"

cjdns "github.com/ehmry/go-cjdns/admin"
)

type cjdnsService struct {
host host.Host
dialed map[ma.Multiaddr]bool
ctx context.Context
lk sync.Mutex
notifees []Notifee
}

func NewCjdnsService(ctx context.Context, host host.Host, cfg config.Cjdns) (Service, error) {
s := &cjdnsService{
host: host,
dialed: map[ma.Multiaddr]bool{},
ctx: ctx,
}

admin, err := cjdnsAdmin(cfg)
if err != nil {
log.Errorf("cjdns admin error: %s", err)
} else {
s.Discover(admin)
}

go func() {
s.Discover(admin)
ticker := time.NewTicker(time.Duration(cfg.Interval) * time.Second)
rticker := time.NewTicker(time.Duration(cfg.RefreshInterval) * time.Second)
select {
case <-ticker.C:
admin, err := cjdnsAdmin(cfg)
if err != nil {
log.Errorf("cjdns admin error: %s", err)
} else {
s.Discover(admin)
}
case <-rticker.C:
s.dialed = map[ma.Multiaddr]bool{}
case <-s.ctx.Done():
ticker.Stop()
rticker.Stop()
return
}
}()

return s, nil
}

// TODO is this right?
func (s *cjdnsService) Close() error {
s.ctx.Done()
return nil
}

func (s *cjdnsService) Discover(admin *cjdns.Conn) {
nodes, err := knownCjdnsNodes(admin)
if err != nil {
log.Errorf("known cjdns nodes error: %s", err)
return
}

for _, maddr := range nodes {
if _, dialed := s.dialed[maddr]; dialed {
continue
}
id, err := s.dial(maddr)
if err != nil {
log.Debugf("dial error: %s", err)
continue
}

str := maddr.String() + "/ipfs/" + id.Pretty()
maddrid, err := ma.NewMultiaddr(str)
if err != nil {
log.Errorf("multiaddr error: [%s] %s", str, err)
continue
}

s.dialed[maddr] = true
log.Infof("discovered %s", str)
s.emit(id, maddrid)
}
}

func (s *cjdnsService) dial(maddr ma.Multiaddr) (peer.ID, error) {
p2pnet := s.host.Network()
swnet := p2pnet.(*swarm.Network)
conn, err := swnet.Swarm().Dialer().Dial(s.ctx, maddr, "")
if err != nil {
return "", err
}

id := conn.RemotePeer()
if len(id) == 0 {
return "", fmt.Debugf("handshake failed with %s", maddr.String())
}

return id, nil
}

func (s *cjdnsService) emit(id peer.ID, maddr ma.Multiaddr) {
pi := peer.PeerInfo{
ID: id,
Addrs: []ma.Multiaddr{maddr},
}

s.lk.Lock()
for _, n := range s.notifees {
n.HandlePeerFound(pi)
}
s.lk.Unlock()
}

func knownCjdnsNodes(admin *cjdns.Conn) ([]ma.Multiaddr, error) {
nodes := []ma.Multiaddr{}

peers, err := admin.InterfaceController_peerStats()
if err != nil {
return nil, err
}
for _, peer := range peers {
maddr, err := fromCjdnsIP(peer.PublicKey.IP())
if err != nil {
return nil, err
}
nodes = append(nodes, maddr)
}

nodestore, err := admin.NodeStore_dumpTable()
if err != nil {
return nil, err
}
for _, node := range nodestore {
maddr, err := fromCjdnsIP(*node.IP)
if err != nil {
return nil, err
}
nodes = append(nodes, maddr)
}

return nodes, nil
}

func fromCjdnsIP(ip net.IP) (ma.Multiaddr, error) {
return manet.FromNetAddr(&net.TCPAddr{IP: ip, Port: 4001})
}

func cjdnsAdmin(cfg config.Cjdns) (*cjdns.Conn, error) {
maddr, err := ma.NewMultiaddr(cfg.AdminAddress)
if err != nil {
panic(fmt.Errorf("invalid Cjdns.AdminAddress: %s", err))
}
p := strings.Split(maddr.String(), "/")[1:]
if p[2] != "udp" {
panic(fmt.Errorf("non-udp Cjdns.AdminAddress: %s", p[2]))
}

port, _ := strconv.ParseInt(p[3], 10, 16)
c := &cjdns.CjdnsAdminConfig{
Addr: p[1],
Port: int(port),
Password: "NONE",
}
admin, err := cjdns.Connect(c)
if err != nil {
return nil, err
}
return admin, nil
}

func (s *cjdnsService) RegisterNotifee(n Notifee) {
s.lk.Lock()
s.notifees = append(s.notifees, n)
s.lk.Unlock()
}

func (s *cjdnsService) UnregisterNotifee(n Notifee) {
s.lk.Lock()
found := -1
for i, notif := range s.notifees {
if notif == n {
found = i
break
}
}
if found != -1 {
s.notifees = append(s.notifees[:found], s.notifees[found+1:]...)
}
s.lk.Unlock()
}
20 changes: 20 additions & 0 deletions p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package discovery

import (
"io"

"github.com/ipfs/go-ipfs/p2p/peer"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)

var log = logging.Logger("discovery")

type Service interface {
io.Closer
RegisterNotifee(Notifee)
UnregisterNotifee(Notifee)
}

type Notifee interface {
HandlePeerFound(peer.PeerInfo)
}
14 changes: 0 additions & 14 deletions p2p/discovery/mdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package discovery

import (
"errors"
"io"
"io/ioutil"
golog "log"
"net"
Expand All @@ -15,23 +14,10 @@ import (

"github.com/ipfs/go-ipfs/p2p/host"
"github.com/ipfs/go-ipfs/p2p/peer"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)

var log = logging.Logger("mdns")

const ServiceTag = "discovery.ipfs.io"

type Service interface {
io.Closer
RegisterNotifee(Notifee)
UnregisterNotifee(Notifee)
}

type Notifee interface {
HandlePeerFound(peer.PeerInfo)
}

type mdnsService struct {
server *mdns.Server
service *mdns.MDNSService
Expand Down
5 changes: 5 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (s *Swarm) Close() error {
return s.proc.Close()
}

// Dialer returns the p2p/net dialer of this swarm
func (s *Swarm) Dialer() *conn.Dialer {
return s.dialer
}

// StreamSwarm returns the underlying peerstream.Swarm
func (s *Swarm) StreamSwarm() *ps.Swarm {
return s.swarm
Expand Down
12 changes: 11 additions & 1 deletion repo/config/discovery.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package config

type Discovery struct {
MDNS MDNS
MDNS MDNS
Cjdns Cjdns
}

type MDNS struct {
Expand All @@ -10,3 +11,12 @@ type MDNS struct {
// Time in seconds between discovery rounds
Interval int
}

type Cjdns struct {
Enabled bool
DialTimeout int
Interval int // 10m0s
RefreshInterval int // 24h0m0s
AdminAddress string // /ip4/127.0.0.1/udp/11234
Password string // NONE
}
Loading