Skip to content

Commit

Permalink
add todo for peekable
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Oct 5, 2024
1 parent 5e3f55a commit 6f21a3f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 44 deletions.
12 changes: 8 additions & 4 deletions p2p/transport/tcpreuse/demultiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func ConnTypeFromConn(c net.Conn) (DemultiplexedConnType, manet.Conn, error) {
// ReadSampleFromConn reads a sample and returns a reader which still includes the sample, so it can be kept undamaged.
// If an error occurs it only returns the error.
func ReadSampleFromConn(c net.Conn) (Sample, manet.Conn, error) {
// TODO: Should we remove this? This is only implemented by bufio.Reader.
// This made sense for magiselect: https://github.com/libp2p/go-libp2p/pull/2737 as it deals with a wrapped
// ReadWriteCloser from multistream which does use a buffered reader underneath.
// For our present purpose, we have a net.Conn and no net.Conn implementation offers peeking.
if peekAble, ok := c.(peekAble); ok {
b, err := peekAble.Peek(len(Sample{}))
switch {
Expand Down Expand Up @@ -122,7 +126,7 @@ func ReadSampleFromConn(c net.Conn) (Sample, manet.Conn, error) {
return sc.s, sc, nil
}

// Try out best to mimic a TCPConn's functions
// tcpConnInterface is the interface for TCPConn's functions
// Note: Skipping `SyscallConn() (syscall.RawConn, error)` since it can be misused given we've read a few bytes from the connection.
type tcpConnInterface interface {
net.Conn
Expand Down Expand Up @@ -179,12 +183,12 @@ func (sc *sampledConn) Read(b []byte) (int, error) {
return sc.tcpConnInterface.Read(b)
}

// forward optimizations
// TODO: Do we need these?

func (sc *sampledConn) ReadFrom(r io.Reader) (int64, error) {
return io.Copy(sc.tcpConnInterface, r)
}

// forward optimizations
func (sc *sampledConn) WriteTo(w io.Writer) (total int64, err error) {
if int(sc.readFromSample) != len(sc.s) {
b := sc.s[sc.readFromSample:]
Expand All @@ -211,7 +215,7 @@ type Matcher interface {
Match(s Sample) bool
}

// Sample might evolve over time.
// Sample is the byte sequence we use to demultiplex.
type Sample [3]byte

// Matchers are implemented here instead of in the transports so we can easily fuzz them together.
Expand Down
95 changes: 55 additions & 40 deletions p2p/transport/tcpreuse/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestListenerMultiplexed(t *testing.T) {
}
lconn := multistream.NewMSSelect(conn, "a")
buf := make([]byte, 10)
_, err = lconn.Write([]byte("hello"))
_, err = lconn.Write([]byte("multistream"))
if err != nil {
t.Error(err)
}
Expand All @@ -271,7 +271,7 @@ func TestListenerMultiplexed(t *testing.T) {
t.Error("failed to dial", err, i)
return
}
err = conn.WriteMessage(websocket.TextMessage, []byte("hello"))
err = conn.WriteMessage(websocket.TextMessage, []byte("websocket"))
if err != nil {
t.Error(err)
}
Expand All @@ -295,7 +295,7 @@ func TestListenerMultiplexed(t *testing.T) {
t.Error("failed to dial", err, i)
return
}
err = conn.WriteMessage(websocket.TextMessage, []byte("hello"))
err = conn.WriteMessage(websocket.TextMessage, []byte("websocket-tls"))
if err != nil {
t.Error(err)
}
Expand All @@ -306,45 +306,60 @@ func TestListenerMultiplexed(t *testing.T) {
}()
}
}()

var wg sync.WaitGroup
for i := 0; i < N; i++ {
c, err := msl.Accept()
require.NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()
cc := multistream.NewMSSelect(c, "a")
buf := make([]byte, 10)
n, err := cc.Read(buf)
require.NoError(t, err)
require.Equal(t, "hello", string(buf[:n]))
c.Close()
}()
}
for i := 0; i < N; i++ {
c := <-wh.conns
wg.Add(1)
go func() {
defer wg.Done()
msgType, buf, err := c.ReadMessage()
require.NoError(t, err)
require.Equal(t, msgType, websocket.TextMessage)
require.Equal(t, "hello", string(buf))
c.Close()
}()
}
for i := 0; i < N; i++ {
c := <-whs.conns
wg.Add(1)
go func() {
defer wg.Done()
msgType, buf, err := c.ReadMessage()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
c, err := msl.Accept()
require.NoError(t, err)
require.Equal(t, msgType, websocket.TextMessage)
require.Equal(t, "hello", string(buf))
c.Close()
}()
}
wg.Add(1)
go func() {
defer wg.Done()
cc := multistream.NewMSSelect(c, "a")
buf := make([]byte, 20)
n, err := cc.Read(buf)
require.NoError(t, err)
require.Equal(t, "multistream", string(buf[:n]))
cc.Close()
}()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
c := <-wh.conns
wg.Add(1)
go func() {
defer wg.Done()
msgType, buf, err := c.ReadMessage()
require.NoError(t, err)
require.Equal(t, msgType, websocket.TextMessage)
require.Equal(t, "websocket", string(buf))
c.Close()
}()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
c := <-whs.conns
wg.Add(1)
go func() {
defer wg.Done()
msgType, buf, err := c.ReadMessage()
require.NoError(t, err)
require.Equal(t, msgType, websocket.TextMessage)
require.Equal(t, "websocket-tls", string(buf))
c.Close()
}()
}
}()
wg.Wait()
}
}

0 comments on commit 6f21a3f

Please sign in to comment.