diff --git a/core/bootstrap.go b/core/bootstrap.go index bf95f351bb2..7015269ba17 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -211,8 +211,6 @@ func toPeerInfos(bpeers []config.BootstrapPeer) []peer.PeerInfo { } func toPeerInfo(bp config.BootstrapPeer) peer.PeerInfo { - // for now, we drop the "ipfs addr" part of the multiaddr. the rest - // of the codebase currently uses addresses without the peerid part. m := bp.Multiaddr() s := ma.Split(m) m = ma.Join(s[:len(s)-1]...) diff --git a/core/builder.go b/core/builder.go index 999f11a46b1..e06929c30dc 100644 --- a/core/builder.go +++ b/core/builder.go @@ -139,7 +139,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { if err != nil { return err } - do := setupDiscoveryOption(rcfg.Discovery) + do := setupDiscoveryOptions(rcfg.Discovery) if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do); err != nil { return err } diff --git a/core/core.go b/core/core.go index fbbfc35f34f..7770f34a0f5 100644 --- a/core/core.go +++ b/core/core.go @@ -95,7 +95,7 @@ type IpfsNode struct { DAG merkledag.DAGService // the merkle dag service, get/add objects. Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter - Discovery discovery.Service + Discovery []discovery.Service // Online PeerHost p2phost.Host // the network host (server+client) @@ -124,7 +124,7 @@ type Mounts struct { Ipns mount.Mount } -func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error { +func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, discoveryOpts []DiscoveryOption) error { if n.PeerHost != nil { // already online. return errors.New("node already online") @@ -170,29 +170,37 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) // setup local discovery - if do != nil { - service, err := do(n.PeerHost) - if err != nil { - log.Error("mdns error: ", err) + for _, opt := range discoveryOpts { + if service, err := opt(ctx, n.PeerHost); err != nil { + return err } else { service.RegisterNotifee(n) - n.Discovery = service + n.Discovery = append(n.Discovery, service) } } return n.Bootstrap(DefaultBootstrapConfig) } -func setupDiscoveryOption(d config.Discovery) DiscoveryOption { +func setupDiscoveryOptions(d config.Discovery) []DiscoveryOption { + opts := []DiscoveryOption{} + if d.MDNS.Enabled { - return func(h p2phost.Host) (discovery.Service, error) { + opt := func(_ context.Context, h p2phost.Host) (discovery.Service, error) { if d.MDNS.Interval == 0 { d.MDNS.Interval = 5 } return discovery.NewMdnsService(h, time.Duration(d.MDNS.Interval)*time.Second) } + opts = append(opts, opt) } - return nil + if d.Cjdns.Enabled { + opt := func(ctx, context.Context, h p2phost.Host) (discovery.Service, error) { + return discovery.NewCjdnsService(ctx, h, d.Cjdns) + } + opts = append(opts, opt) + } + return opts } func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) { diff --git a/p2p/crypto/secio/interface.go b/p2p/crypto/secio/interface.go index 3fc54875f53..01f607c0262 100644 --- a/p2p/crypto/secio/interface.go +++ b/p2p/crypto/secio/interface.go @@ -2,6 +2,7 @@ package secio import ( + "fmt" "io" ci "github.com/ipfs/go-ipfs/p2p/crypto" @@ -68,6 +69,7 @@ func (s *secureSession) LocalPrivateKey() ci.PrivKey { // RemotePeer retrieves the remote peer. func (s *secureSession) RemotePeer() peer.ID { if err := s.Handshake(); err != nil { + fmt.Printf("handshake error: %s\n", err) return "" } return s.remotePeer diff --git a/p2p/discovery/cjdns.go b/p2p/discovery/cjdns.go new file mode 100644 index 00000000000..f45e09a94ad --- /dev/null +++ b/p2p/discovery/cjdns.go @@ -0,0 +1,210 @@ +package discovery + +import ( + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + host "github.com/ipfs/go-ipfs/p2p/host" + swarm "github.com/ipfs/go-ipfs/p2p/net/swarm" + peer "github.com/ipfs/go-ipfs/p2p/peer" + // TODO: kill this dependency + config "github.com/ipfs/go-ipfs/repo/config" + + cjdns "github.com/ehmry/go-cjdns/admin" +) + +type cjdnsService struct { + host host.Host + dialed map[ma.Multiaddr]bool + ctx context.Context + lk sync.Mutex + notifees []Notifee +} + +func NewCjdnsService(ctx context.Context, host host.Host, cfg config.Cjdns) (Service, error) { + s := &cjdnsService{ + host: host, + dialed: map[ma.Multiaddr]bool{}, + ctx: ctx, + } + + admin, err := cjdnsAdmin(cfg) + if err != nil { + log.Errorf("cjdns admin error: %s", err) + } else { + s.Discover(admin) + } + + go func() { + s.Discover(admin) + ticker := time.NewTicker(time.Duration(cfg.Interval) * time.Second) + rticker := time.NewTicker(time.Duration(cfg.RefreshInterval) * time.Second) + select { + case <-ticker.C: + admin, err := cjdnsAdmin(cfg) + if err != nil { + log.Errorf("cjdns admin error: %s", err) + } else { + s.Discover(admin) + } + case <-rticker.C: + s.dialed = map[ma.Multiaddr]bool{} + case <-s.ctx.Done(): + ticker.Stop() + rticker.Stop() + return + } + }() + + return s, nil +} + +// TODO is this right? +func (s *cjdnsService) Close() error { + s.ctx.Done() + return nil +} + +func (s *cjdnsService) Discover(admin *cjdns.Conn) { + nodes, err := knownCjdnsNodes(admin) + if err != nil { + log.Errorf("known cjdns nodes error: %s", err) + return + } + + for _, maddr := range nodes { + if _, dialed := s.dialed[maddr]; dialed { + continue + } + id, err := s.dial(maddr) + if err != nil { + log.Debugf("dial error: %s", err) + continue + } + + str := maddr.String() + "/ipfs/" + id.Pretty() + maddrid, err := ma.NewMultiaddr(str) + if err != nil { + log.Errorf("multiaddr error: [%s] %s", str, err) + continue + } + + s.dialed[maddr] = true + log.Infof("discovered %s", str) + s.emit(id, maddrid) + } +} + +func (s *cjdnsService) dial(maddr ma.Multiaddr) (peer.ID, error) { + p2pnet := s.host.Network() + swnet := p2pnet.(*swarm.Network) + conn, err := swnet.Swarm().Dialer().Dial(s.ctx, maddr, "") + if err != nil { + return "", err + } + + id := conn.RemotePeer() + if len(id) == 0 { + return "", fmt.Debugf("handshake failed with %s", maddr.String()) + } + + return id, nil +} + +func (s *cjdnsService) emit(id peer.ID, maddr ma.Multiaddr) { + pi := peer.PeerInfo{ + ID: id, + Addrs: []ma.Multiaddr{maddr}, + } + + s.lk.Lock() + for _, n := range s.notifees { + n.HandlePeerFound(pi) + } + s.lk.Unlock() +} + +func knownCjdnsNodes(admin *cjdns.Conn) ([]ma.Multiaddr, error) { + nodes := []ma.Multiaddr{} + + peers, err := admin.InterfaceController_peerStats() + if err != nil { + return nil, err + } + for _, peer := range peers { + maddr, err := fromCjdnsIP(peer.PublicKey.IP()) + if err != nil { + return nil, err + } + nodes = append(nodes, maddr) + } + + nodestore, err := admin.NodeStore_dumpTable() + if err != nil { + return nil, err + } + for _, node := range nodestore { + maddr, err := fromCjdnsIP(*node.IP) + if err != nil { + return nil, err + } + nodes = append(nodes, maddr) + } + + return nodes, nil +} + +func fromCjdnsIP(ip net.IP) (ma.Multiaddr, error) { + return manet.FromNetAddr(&net.TCPAddr{IP: ip, Port: 4001}) +} + +func cjdnsAdmin(cfg config.Cjdns) (*cjdns.Conn, error) { + maddr, err := ma.NewMultiaddr(cfg.AdminAddress) + if err != nil { + panic(fmt.Errorf("invalid Cjdns.AdminAddress: %s", err)) + } + p := strings.Split(maddr.String(), "/")[1:] + if p[2] != "udp" { + panic(fmt.Errorf("non-udp Cjdns.AdminAddress: %s", p[2])) + } + + port, _ := strconv.ParseInt(p[3], 10, 16) + c := &cjdns.CjdnsAdminConfig{ + Addr: p[1], + Port: int(port), + Password: "NONE", + } + admin, err := cjdns.Connect(c) + if err != nil { + return nil, err + } + return admin, nil +} + +func (s *cjdnsService) RegisterNotifee(n Notifee) { + s.lk.Lock() + s.notifees = append(s.notifees, n) + s.lk.Unlock() +} + +func (s *cjdnsService) UnregisterNotifee(n Notifee) { + s.lk.Lock() + found := -1 + for i, notif := range s.notifees { + if notif == n { + found = i + break + } + } + if found != -1 { + s.notifees = append(s.notifees[:found], s.notifees[found+1:]...) + } + s.lk.Unlock() +} diff --git a/p2p/discovery/discovery.go b/p2p/discovery/discovery.go new file mode 100644 index 00000000000..e4d5d44c399 --- /dev/null +++ b/p2p/discovery/discovery.go @@ -0,0 +1,20 @@ +package discovery + +import ( + "io" + + "github.com/ipfs/go-ipfs/p2p/peer" + logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" +) + +var log = logging.Logger("discovery") + +type Service interface { + io.Closer + RegisterNotifee(Notifee) + UnregisterNotifee(Notifee) +} + +type Notifee interface { + HandlePeerFound(peer.PeerInfo) +} diff --git a/p2p/discovery/mdns.go b/p2p/discovery/mdns.go index 61312fefa5d..c10f57b9c46 100644 --- a/p2p/discovery/mdns.go +++ b/p2p/discovery/mdns.go @@ -2,7 +2,6 @@ package discovery import ( "errors" - "io" "io/ioutil" golog "log" "net" @@ -15,23 +14,10 @@ import ( "github.com/ipfs/go-ipfs/p2p/host" "github.com/ipfs/go-ipfs/p2p/peer" - logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" ) -var log = logging.Logger("mdns") - const ServiceTag = "discovery.ipfs.io" -type Service interface { - io.Closer - RegisterNotifee(Notifee) - UnregisterNotifee(Notifee) -} - -type Notifee interface { - HandlePeerFound(peer.PeerInfo) -} - type mdnsService struct { server *mdns.Server service *mdns.MDNSService diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index dabcf5368e9..33138875b00 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -170,6 +170,11 @@ func (s *Swarm) Close() error { return s.proc.Close() } +// Dialer returns the p2p/net dialer of this swarm +func (s *Swarm) Dialer() *conn.Dialer { + return s.dialer +} + // StreamSwarm returns the underlying peerstream.Swarm func (s *Swarm) StreamSwarm() *ps.Swarm { return s.swarm diff --git a/repo/config/discovery.go b/repo/config/discovery.go index 4fb8508f00a..4160f223516 100644 --- a/repo/config/discovery.go +++ b/repo/config/discovery.go @@ -1,7 +1,8 @@ package config type Discovery struct { - MDNS MDNS + MDNS MDNS + Cjdns Cjdns } type MDNS struct { @@ -10,3 +11,12 @@ type MDNS struct { // Time in seconds between discovery rounds Interval int } + +type Cjdns struct { + Enabled bool + DialTimeout int + Interval int // 10m0s + RefreshInterval int // 24h0m0s + AdminAddress string // /ip4/127.0.0.1/udp/11234 + Password string // NONE +} diff --git a/repo/config/init.go b/repo/config/init.go index eaa23d28528..ebc9670636f 100644 --- a/repo/config/init.go +++ b/repo/config/init.go @@ -49,10 +49,18 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) { SupernodeRouting: *snr, Datastore: *ds, Identity: identity, - Discovery: Discovery{MDNS{ - Enabled: true, - Interval: 10, - }}, + Discovery: Discovery{ + MDNS{ + Enabled: true, + Interval: 10, + }, + Cjdns{ + Enabled: true, + Interval: 3600, + RefreshInterval: 24 * 3600, + AdminAddress: "/ip4/127.0.0.1/udp/11234", + Password: "NONE", + }}, // setup the node mount points. Mounts: Mounts{