Skip to content

Commit

Permalink
Merge pull request #7 from libp2p/feat/fast-close
Browse files Browse the repository at this point in the history
don't hold the lock when closing
  • Loading branch information
Stebalien authored Dec 11, 2017
2 parents 709299b + 3b66fd3 commit d82125c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 13 deletions.
6 changes: 0 additions & 6 deletions msgio.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ func (s *writer) WriteMsg(msg []byte) (err error) {
}

func (s *writer) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
Expand Down Expand Up @@ -216,9 +213,6 @@ func (s *reader) ReleaseMsg(msg []byte) {
}

func (s *reader) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
Expand Down
51 changes: 50 additions & 1 deletion msgio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package msgio
import (
"bytes"
"fmt"
randbuf "github.com/jbenet/go-randbuf"
"io"
"math/rand"
"sync"
"testing"
"time"

randbuf "github.com/jbenet/go-randbuf"
)

func TestReadWrite(t *testing.T) {
Expand All @@ -32,6 +33,20 @@ func TestReadWriteMsgSync(t *testing.T) {
SubtestReadWriteMsgSync(t, writer, reader)
}

func TestReadClose(t *testing.T) {
r, w := io.Pipe()
writer := NewWriter(w)
reader := NewReader(r)
SubtestReadClose(t, writer, reader)
}

func TestWriteClose(t *testing.T) {
r, w := io.Pipe()
writer := NewWriter(w)
reader := NewReader(r)
SubtestWriteClose(t, writer, reader)
}

func SubtestReadWrite(t *testing.T, writer WriteCloser, reader ReadCloser) {
msgs := [1000][]byte{}

Expand Down Expand Up @@ -195,3 +210,37 @@ func TestBadSizes(t *testing.T) {
}
_ = msg
}

func SubtestReadClose(t *testing.T, writer WriteCloser, reader ReadCloser) {
defer writer.Close()

buf := [10]byte{}
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(10 * time.Millisecond)
reader.Close()
}()
n, err := reader.Read(buf[:])
if n != 0 || err == nil {
t.Error("expected to read nothing")
}
<-done
}

func SubtestWriteClose(t *testing.T, writer WriteCloser, reader ReadCloser) {
defer reader.Close()

buf := [10]byte{}
done := make(chan struct{})
go func() {
defer close(done)
time.Sleep(10 * time.Millisecond)
writer.Close()
}()
n, err := writer.Write(buf[:])
if n != 0 || err == nil {
t.Error("expected to read nothing")
}
<-done
}
6 changes: 0 additions & 6 deletions varint.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func (s *varintWriter) WriteMsg(msg []byte) error {
}

func (s *varintWriter) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

if c, ok := s.W.(io.Closer); ok {
return c.Close()
}
Expand Down Expand Up @@ -162,9 +159,6 @@ func (s *varintReader) ReleaseMsg(msg []byte) {
}

func (s *varintReader) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

if c, ok := s.R.(io.Closer); ok {
return c.Close()
}
Expand Down
15 changes: 15 additions & 0 deletions varint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package msgio
import (
"bytes"
"encoding/binary"
"io"
"testing"
)

Expand Down Expand Up @@ -64,3 +65,17 @@ func SubtestVarintWrite(t *testing.T, msg []byte) {
t.Fatalf("wrote incorrect number of bytes: %d != %d", len(bb), bblen)
}
}

func TestVarintReadClose(t *testing.T) {
r, w := io.Pipe()
writer := NewVarintWriter(w)
reader := NewVarintReader(r)
SubtestReadClose(t, writer, reader)
}

func TestVarintWriteClose(t *testing.T) {
r, w := io.Pipe()
writer := NewVarintWriter(w)
reader := NewVarintReader(r)
SubtestWriteClose(t, writer, reader)
}

0 comments on commit d82125c

Please sign in to comment.