diff --git a/p2p/net/gostream/addr.go b/p2p/net/gostream/addr.go new file mode 100644 index 0000000000..49d844f675 --- /dev/null +++ b/p2p/net/gostream/addr.go @@ -0,0 +1,14 @@ +package gostream + +import "github.com/libp2p/go-libp2p/core/peer" + +// addr implements net.Addr and holds a libp2p peer ID. +type addr struct{ id peer.ID } + +// Network returns the name of the network that this address belongs to +// (libp2p). +func (a *addr) Network() string { return Network } + +// String returns the peer ID of this address in string form +// (B58-encoded). +func (a *addr) String() string { return a.id.String() } diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go new file mode 100644 index 0000000000..991dd2ff96 --- /dev/null +++ b/p2p/net/gostream/conn.go @@ -0,0 +1,43 @@ +package gostream + +import ( + "context" + "net" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" +) + +// conn is an implementation of net.Conn which wraps +// libp2p streams. +type conn struct { + network.Stream +} + +// newConn creates a conn given a libp2p stream +func newConn(s network.Stream) net.Conn { + return &conn{s} +} + +// LocalAddr returns the local network address. +func (c *conn) LocalAddr() net.Addr { + return &addr{c.Stream.Conn().LocalPeer()} +} + +// RemoteAddr returns the remote network address. +func (c *conn) RemoteAddr() net.Addr { + return &addr{c.Stream.Conn().RemotePeer()} +} + +// Dial opens a stream to the destination address +// (which should parseable to a peer ID) using the given +// host and returns it as a standard net.Conn. +func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error) { + s, err := h.NewStream(ctx, pid, tag) + if err != nil { + return nil, err + } + return newConn(s), nil +} diff --git a/p2p/net/gostream/gostream.go b/p2p/net/gostream/gostream.go new file mode 100644 index 0000000000..a15125be32 --- /dev/null +++ b/p2p/net/gostream/gostream.go @@ -0,0 +1,19 @@ +// Package gostream allows to replace the standard net stack in Go +// with [LibP2P](https://github.com/libp2p/libp2p) streams. +// +// Given a libp2p.Host, gostream provides Dial() and Listen() methods which +// return implementations of net.Conn and net.Listener. +// +// Instead of the regular "host:port" addressing, `gostream` uses a Peer ID, +// and rather than a raw TCP connection, gostream will use libp2p's net.Stream. +// This means your connections will take advantage of LibP2P's multi-routes, +// NAT transversal and stream multiplexing. +// +// Note that LibP2P hosts cannot dial to themselves, so there is no possibility +// of using the same Host as server and as client. +package gostream + +// Network is the "net.Addr.Network()" name returned by +// addresses used by gostream connections. In turn, the "net.Addr.String()" will +// be a peer ID. +var Network = "libp2p" diff --git a/p2p/net/gostream/gostream_test.go b/p2p/net/gostream/gostream_test.go new file mode 100644 index 0000000000..e961546ed6 --- /dev/null +++ b/p2p/net/gostream/gostream_test.go @@ -0,0 +1,141 @@ +package gostream + +import ( + "bufio" + "context" + "io" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" +) + +// newHost illustrates how to build a libp2p host with secio using +// a randomly generated key-pair +func newHost(t *testing.T, listen multiaddr.Multiaddr) host.Host { + h, err := libp2p.New( + libp2p.ListenAddrs(listen), + ) + if err != nil { + t.Fatal(err) + } + return h +} + +func TestServerClient(t *testing.T) { + m1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10000") + m2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10001") + srvHost := newHost(t, m1) + clientHost := newHost(t, m2) + defer srvHost.Close() + defer clientHost.Close() + + srvHost.Peerstore().AddAddrs(clientHost.ID(), clientHost.Addrs(), peerstore.PermanentAddrTTL) + clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL) + + var tag protocol.ID = "/testitytest" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + listener, err := Listen(srvHost, tag) + if err != nil { + t.Error(err) + return + } + defer listener.Close() + + if listener.Addr().String() != srvHost.ID().Pretty() { + t.Error("bad listener address") + return + } + + servConn, err := listener.Accept() + if err != nil { + t.Error(err) + return + } + defer servConn.Close() + + reader := bufio.NewReader(servConn) + for { + msg, err := reader.ReadString('\n') + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + return + } + if msg != "is libp2p awesome?\n" { + t.Errorf("Bad incoming message: %s", msg) + return + } + + _, err = servConn.Write([]byte("yes it is\n")) + if err != nil { + t.Error(err) + return + } + } + }() + + clientConn, err := Dial(ctx, clientHost, srvHost.ID(), tag) + if err != nil { + t.Fatal(err) + } + + if clientConn.LocalAddr().String() != clientHost.ID().Pretty() { + t.Fatal("Bad LocalAddr") + } + + if clientConn.RemoteAddr().String() != srvHost.ID().Pretty() { + t.Fatal("Bad RemoteAddr") + } + + if clientConn.LocalAddr().Network() != Network { + t.Fatal("Bad Network()") + } + + err = clientConn.SetDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + err = clientConn.SetReadDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + err = clientConn.SetWriteDeadline(time.Now().Add(time.Second)) + if err != nil { + t.Fatal(err) + } + + _, err = clientConn.Write([]byte("is libp2p awesome?\n")) + if err != nil { + t.Fatal(err) + } + + reader := bufio.NewReader(clientConn) + resp, err := reader.ReadString('\n') + if err != nil { + t.Fatal(err) + } + + if string(resp) != "yes it is\n" { + t.Errorf("Bad response: %s", resp) + } + + err = clientConn.Close() + if err != nil { + t.Fatal(err) + } + <-done +} diff --git a/p2p/net/gostream/listener.go b/p2p/net/gostream/listener.go new file mode 100644 index 0000000000..250e688050 --- /dev/null +++ b/p2p/net/gostream/listener.go @@ -0,0 +1,71 @@ +package gostream + +import ( + "context" + "net" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" +) + +// listener is an implementation of net.Listener which handles +// http-tagged streams from a libp2p connection. +// A listener can be built with Listen() +type listener struct { + host host.Host + ctx context.Context + tag protocol.ID + cancel func() + streamCh chan network.Stream +} + +// Accept returns the next a connection to this listener. +// It blocks if there are no connections. Under the hood, +// connections are libp2p streams. +func (l *listener) Accept() (net.Conn, error) { + select { + case s := <-l.streamCh: + return newConn(s), nil + case <-l.ctx.Done(): + return nil, l.ctx.Err() + } +} + +// Close terminates this listener. It will no longer handle any +// incoming streams +func (l *listener) Close() error { + l.cancel() + l.host.RemoveStreamHandler(l.tag) + return nil +} + +// Addr returns the address for this listener, which is its libp2p Peer ID. +func (l *listener) Addr() net.Addr { + return &addr{l.host.ID()} +} + +// Listen provides a standard net.Listener ready to accept "connections". +// Under the hood, these connections are libp2p streams tagged with the +// given protocol.ID. +func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { + ctx, cancel := context.WithCancel(context.Background()) + + l := &listener{ + host: h, + ctx: ctx, + cancel: cancel, + tag: tag, + streamCh: make(chan network.Stream), + } + + h.SetStreamHandler(tag, func(s network.Stream) { + select { + case l.streamCh <- s: + case <-ctx.Done(): + s.Reset() + } + }) + + return l, nil +}