Skip to content

Commit

Permalink
move go-libp2p-mplex here
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Apr 26, 2022
2 parents 94ce988 + d6fc4be commit d69f1fc
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 0 deletions.
43 changes: 43 additions & 0 deletions p2p/muxer/mplex/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package peerstream_multiplex

import (
"context"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

type conn mp.Multiplex

var _ network.MuxedConn = &conn{}

func (c *conn) Close() error {
return c.mplex().Close()
}

func (c *conn) IsClosed() bool {
return c.mplex().IsClosed()
}

// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.mplex().NewStream(ctx)
if err != nil {
return nil, err
}
return (*stream)(s), nil
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.mplex().Accept()
if err != nil {
return nil, err
}
return (*stream)(s), nil
}

func (c *conn) mplex() *mp.Multiplex {
return (*mp.Multiplex)(c)
}
64 changes: 64 additions & 0 deletions p2p/muxer/mplex/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package peerstream_multiplex

import (
"time"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

// stream implements network.MuxedStream over mplex.Stream.
type stream mp.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.mplex().Read(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}

return n, err
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.mplex().Write(b)
if err == mp.ErrStreamReset {
err = network.ErrReset
}

return n, err
}

func (s *stream) Close() error {
return s.mplex().Close()
}

func (s *stream) CloseWrite() error {
return s.mplex().CloseWrite()
}

func (s *stream) CloseRead() error {
return s.mplex().CloseRead()
}

func (s *stream) Reset() error {
return s.mplex().Reset()
}

func (s *stream) SetDeadline(t time.Time) error {
return s.mplex().SetDeadline(t)
}

func (s *stream) SetReadDeadline(t time.Time) error {
return s.mplex().SetReadDeadline(t)
}

func (s *stream) SetWriteDeadline(t time.Time) error {
return s.mplex().SetWriteDeadline(t)
}

func (s *stream) mplex() *mp.Stream {
return (*mp.Stream)(s)
}
26 changes: 26 additions & 0 deletions p2p/muxer/mplex/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package peerstream_multiplex

import (
"net"

"github.com/libp2p/go-libp2p-core/network"

mp "github.com/libp2p/go-mplex"
)

// DefaultTransport has default settings for Transport
var DefaultTransport = &Transport{}

var _ network.Multiplexer = &Transport{}

// Transport implements mux.Multiplexer that constructs
// mplex-backed muxed connections.
type Transport struct{}

func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
m, err := mp.NewMultiplex(nc, isServer, scope)
if err != nil {
return nil, err
}
return (*conn)(m), nil
}
52 changes: 52 additions & 0 deletions p2p/muxer/mplex/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package peerstream_multiplex

import (
"errors"
"net"
"testing"

"github.com/libp2p/go-libp2p-core/network"
test "github.com/libp2p/go-libp2p-testing/suites/mux"
)

func TestDefaultTransport(t *testing.T) {
test.SubtestAll(t, DefaultTransport)
}

type memoryScope struct {
network.PeerScope
limit int
reserved int
}

func (m *memoryScope) ReserveMemory(size int, prio uint8) error {
if m.reserved+size > m.limit {
return errors.New("too much")
}
m.reserved += size
return nil
}

func (m *memoryScope) ReleaseMemory(size int) {
m.reserved -= size
if m.reserved < 0 {
panic("too much memory released")
}
}

type memoryLimitedTransport struct {
Transport
}

func (t *memoryLimitedTransport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
return t.Transport.NewConn(nc, isServer, &memoryScope{
limit: 3 * 1 << 20,
PeerScope: scope,
})
}

func TestDefaultTransportWithMemoryLimit(t *testing.T) {
test.SubtestAll(t, &memoryLimitedTransport{
Transport: *DefaultTransport,
})
}

0 comments on commit d69f1fc

Please sign in to comment.