Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move libp2p/go-libp2p-gostream to p2p/net/gostream #2535

Merged
merged 25 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bedd5ad
Import and adapt code from go-libp2p-http
hsanjuan Oct 18, 2017
8b11532
Increase coverage
hsanjuan Oct 18, 2017
5fc7f01
Make Listener, Conn and Addr private.
hsanjuan Oct 19, 2017
9b46b6e
gx publish 0.0.10
hsanjuan Jun 27, 2018
28997d8
Merge pull request #7 from hsanjuan/update/libp2p
hsanjuan Jun 27, 2018
61782c9
Reset streams when no reading is going to be done anymore
hsanjuan Jun 27, 2018
d33d7e8
Reset streams: do not close server while reading
hsanjuan Jun 27, 2018
d0d288b
Do not reset streams on close immediately. Close first. The reset aft…
hsanjuan Jun 28, 2018
2e9c129
Merge pull request #8 from hsanjuan/reset/streams
hsanjuan Jun 28, 2018
9e002de
migrate to consolidated types.
raulk May 26, 2019
6fc01ef
Merge pull request #26 from raulk/migrate-types
hsanjuan May 27, 2019
85e37a9
Update README links and copyright
hsanjuan Jun 6, 2019
32a8937
Pass context to dial and do not hardcode it
hsanjuan Sep 27, 2019
6114641
Merge pull request #27 from libp2p/fix/timeout
hsanjuan Sep 27, 2019
051ed9b
feat: use go-libp2p-core 0.7.0 stream interfaces
Stebalien Nov 2, 2020
5835f56
Merge pull request #60 from libp2p/feat/rw-close
Stebalien Nov 12, 2020
63b7324
fix staticcheck
marten-seemann May 19, 2021
47c0c5c
Merge pull request #61 from libp2p/fix-staticcheck
marten-seemann May 19, 2021
d49ac8b
test: wait for inner goroutine to finish
Stebalien Jul 21, 2021
38a5d17
Merge pull request #62 from libp2p/web3-bot/sync
Stebalien Jul 21, 2021
f49059e
Update to latest libp2p
hsanjuan Jun 20, 2022
99b1ee7
Merge pull request #69 from libp2p/update-libp2p
hsanjuan Jun 20, 2022
c435f99
Expose some read-only methods on the underlying Stream interface (#67)
dirkmc Aug 18, 2022
477c485
update go-libp2p to v0.22.0 (#73)
nisainan Sep 2, 2022
8e0d111
move libp2p/go-libp2p-gostream to p2p/net/gostream
marten-seemann Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions p2p/net/gostream/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gostream

import "github.com/libp2p/go-libp2p/core/peer"

// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id peer.ID }

// Network returns the name of the network that this address belongs to
// (libp2p).
func (a *addr) Network() string { return Network }

// String returns the peer ID of this address in string form
// (B58-encoded).
func (a *addr) String() string { return a.id.String() }
43 changes: 43 additions & 0 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

// conn is an implementation of net.Conn which wraps
// libp2p streams.
type conn struct {
network.Stream
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
}

// LocalAddr returns the local network address.
func (c *conn) LocalAddr() net.Addr {
return &addr{c.Stream.Conn().LocalPeer()}
}

// RemoteAddr returns the remote network address.
func (c *conn) RemoteAddr() net.Addr {
return &addr{c.Stream.Conn().RemotePeer()}
}

// Dial opens a stream to the destination address
// (which should parseable to a peer ID) using the given
// host and returns it as a standard net.Conn.
func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error) {
s, err := h.NewStream(ctx, pid, tag)
if err != nil {
return nil, err
}
return newConn(s), nil
}
19 changes: 19 additions & 0 deletions p2p/net/gostream/gostream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package gostream allows to replace the standard net stack in Go
// with [LibP2P](https://github.com/libp2p/libp2p) streams.
//
// Given a libp2p.Host, gostream provides Dial() and Listen() methods which
// return implementations of net.Conn and net.Listener.
//
// Instead of the regular "host:port" addressing, `gostream` uses a Peer ID,
// and rather than a raw TCP connection, gostream will use libp2p's net.Stream.
// This means your connections will take advantage of LibP2P's multi-routes,
// NAT transversal and stream multiplexing.
//
// Note that LibP2P hosts cannot dial to themselves, so there is no possibility
// of using the same Host as server and as client.
package gostream

// Network is the "net.Addr.Network()" name returned by
// addresses used by gostream connections. In turn, the "net.Addr.String()" will
// be a peer ID.
var Network = "libp2p"
141 changes: 141 additions & 0 deletions p2p/net/gostream/gostream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package gostream

import (
"bufio"
"context"
"io"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)

// newHost illustrates how to build a libp2p host with secio using
// a randomly generated key-pair
func newHost(t *testing.T, listen multiaddr.Multiaddr) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(listen),
)
if err != nil {
t.Fatal(err)
}
return h
}

func TestServerClient(t *testing.T) {
m1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10000")
m2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10001")
srvHost := newHost(t, m1)
clientHost := newHost(t, m2)
defer srvHost.Close()
defer clientHost.Close()

srvHost.Peerstore().AddAddrs(clientHost.ID(), clientHost.Addrs(), peerstore.PermanentAddrTTL)
clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL)

var tag protocol.ID = "/testitytest"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

done := make(chan struct{})
go func() {
defer close(done)
listener, err := Listen(srvHost, tag)
if err != nil {
t.Error(err)
return
}
defer listener.Close()

if listener.Addr().String() != srvHost.ID().Pretty() {
t.Error("bad listener address")
return
}

servConn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer servConn.Close()

reader := bufio.NewReader(servConn)
for {
msg, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
return
}
if msg != "is libp2p awesome?\n" {
t.Errorf("Bad incoming message: %s", msg)
return
}

_, err = servConn.Write([]byte("yes it is\n"))
if err != nil {
t.Error(err)
return
}
}
}()

clientConn, err := Dial(ctx, clientHost, srvHost.ID(), tag)
if err != nil {
t.Fatal(err)
}

if clientConn.LocalAddr().String() != clientHost.ID().Pretty() {
t.Fatal("Bad LocalAddr")
}

if clientConn.RemoteAddr().String() != srvHost.ID().Pretty() {
t.Fatal("Bad RemoteAddr")
}

if clientConn.LocalAddr().Network() != Network {
t.Fatal("Bad Network()")
}

err = clientConn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

_, err = clientConn.Write([]byte("is libp2p awesome?\n"))
if err != nil {
t.Fatal(err)
}

reader := bufio.NewReader(clientConn)
resp, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}

if string(resp) != "yes it is\n" {
t.Errorf("Bad response: %s", resp)
}

err = clientConn.Close()
if err != nil {
t.Fatal(err)
}
<-done
}
71 changes: 71 additions & 0 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)

// listener is an implementation of net.Listener which handles
// http-tagged streams from a libp2p connection.
// A listener can be built with Listen()
type listener struct {
host host.Host
ctx context.Context
tag protocol.ID
cancel func()
streamCh chan network.Stream
}

// Accept returns the next a connection to this listener.
// It blocks if there are no connections. Under the hood,
// connections are libp2p streams.
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}

// Close terminates this listener. It will no longer handle any
// incoming streams
func (l *listener) Close() error {
l.cancel()
l.host.RemoveStreamHandler(l.tag)
return nil
}

// Addr returns the address for this listener, which is its libp2p Peer ID.
func (l *listener) Addr() net.Addr {
return &addr{l.host.ID()}
}

// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
host: h,
ctx: ctx,
cancel: cancel,
tag: tag,
streamCh: make(chan network.Stream),
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
case l.streamCh <- s:
case <-ctx.Done():
s.Reset()
}
})

return l, nil
}