forked from moby/spdystream
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
attempt at testing sync stream creation
This is an attempt at causing a protocol race in stream creation, so STREAM_SYN frames out of order (not monotonically increasing) happen. This test does not succeed in doing that, but it does appear to deadlock. License: Apache 2 Signed-off-by: Juan Batiz-Benet <[email protected]>
- Loading branch information
Showing
1 changed file
with
133 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,12 @@ package spdystream | |
import ( | ||
"bufio" | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"net" | ||
"net/http" | ||
"net/http/httptest" | ||
"runtime" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
@@ -877,6 +879,79 @@ func TestFramingAfterRemoteConnectionClosed(t *testing.T) { | |
conn.Close() | ||
} | ||
|
||
func TestMonotonicStreamCreate(t *testing.T) { | ||
var wg sync.WaitGroup | ||
var concurrency = 500 | ||
This comment has been minimized.
Sorry, something went wrong. |
||
var msgs = 10 | ||
|
||
streamHello := func(spdyConn *Connection, msgs int) { | ||
stream, err := spdyConn.CreateStream(http.Header{}, nil, false) | ||
if err != nil { | ||
t.Fatalf("error creating client stream: %s", err) | ||
} | ||
|
||
for j := 0; j < msgs; j++ { | ||
fmt.Printf("hello msg %d/%d\n", j, msgs) | ||
This comment has been minimized.
Sorry, something went wrong.
jbenet
Author
Owner
|
||
msg := make([]byte, 2048) | ||
if err := sendAndReceive(stream, msg); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
stream.Reset() | ||
} | ||
|
||
connHellos := func(spdyConn *Connection) { | ||
|
||
var wg2 sync.WaitGroup | ||
for i := 0; i < concurrency; i++ { | ||
wg2.Add(1) | ||
go func(i int) { | ||
runtime.Gosched() // mess with order to achieve concurrent opens. | ||
fmt.Printf("hello conn %d/%d\n", i, concurrency) | ||
streamHello(spdyConn, msgs) | ||
fmt.Printf("hello conn %d/%d\n", i, concurrency) | ||
streamHello(spdyConn, msgs) | ||
fmt.Printf("hello conn %d/%d\n", i, concurrency) | ||
streamHello(spdyConn, msgs) | ||
This comment has been minimized.
Sorry, something went wrong.
jbenet
Author
Owner
|
||
wg2.Done() | ||
}(i) | ||
} | ||
wg2.Wait() | ||
} | ||
|
||
server, listen, serverErr := runServerConnHandler(&wg, func(spdyConn *Connection) { | ||
go spdyConn.Serve(authStreamHandler) | ||
|
||
wg.Add(1) | ||
go func() { | ||
connHellos(spdyConn) | ||
wg.Done() | ||
}() | ||
}) | ||
if serverErr != nil { | ||
t.Fatalf("Error initializing server: %s", serverErr) | ||
} | ||
|
||
conn, dialErr := net.Dial("tcp", listen) | ||
if dialErr != nil { | ||
t.Fatalf("Error dialing server: %s", dialErr) | ||
} | ||
|
||
spdyConn, spdyErr := NewConnection(conn, false) | ||
if spdyErr != nil { | ||
t.Fatalf("Error creating spdy connection: %s", spdyErr) | ||
} | ||
go spdyConn.Serve(authStreamHandler) | ||
connHellos(spdyConn) | ||
|
||
closeErr := server.Close() | ||
if closeErr != nil { | ||
t.Fatalf("Error shutting down server: %s", closeErr) | ||
} | ||
wg.Wait() | ||
} | ||
|
||
var authenticated bool | ||
|
||
func authStreamHandler(stream *Stream) { | ||
|
@@ -886,7 +961,64 @@ func authStreamHandler(stream *Stream) { | |
MirrorStreamHandler(stream) | ||
} | ||
|
||
func sendAndReceive(stream *Stream, buf []byte) error { | ||
lbuf := len(buf) | ||
var rerr, werr error | ||
|
||
// write concurrently to avoid deadlock. | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
|
||
n, err := stream.Write(buf) | ||
if err != nil { | ||
werr = fmt.Errorf("error writing to stream: %s", err) | ||
stream.Reset() // to unblock the reader | ||
return | ||
} | ||
if n != lbuf { | ||
werr = fmt.Errorf("Expected to write %d bytes, but actually wrote %d", lbuf, n) | ||
stream.Reset() // to unblock the reader | ||
return | ||
} | ||
}() | ||
|
||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
|
||
stream.SetReadDeadline(time.Now().Add(time.Second)) | ||
buf2 := make([]byte, lbuf) | ||
n, err := io.ReadFull(stream, buf2) | ||
if err != nil { | ||
rerr = fmt.Errorf("error reading from stream: %s", err) | ||
return | ||
} | ||
if n != lbuf { | ||
rerr = fmt.Errorf("Expected to read %d bytes, but actually read %d", lbuf, n) | ||
return | ||
} | ||
if !bytes.Equal(buf, buf2) { | ||
rerr = fmt.Errorf("expected '%s', got '%s'", buf, buf2) | ||
return | ||
} | ||
}() | ||
|
||
wg.Wait() | ||
if werr != nil { | ||
return werr | ||
} | ||
return rerr | ||
} | ||
|
||
func runServer(wg *sync.WaitGroup) (io.Closer, string, error) { | ||
return runServerConnHandler(wg, func(spdyConn *Connection) { | ||
go spdyConn.Serve(authStreamHandler) | ||
}) | ||
} | ||
|
||
func runServerConnHandler(wg *sync.WaitGroup, handler func(*Connection)) (io.Closer, string, error) { | ||
listener, listenErr := net.Listen("tcp", "localhost:0") | ||
if listenErr != nil { | ||
return nil, "", listenErr | ||
|
@@ -900,8 +1032,7 @@ func runServer(wg *sync.WaitGroup) (io.Closer, string, error) { | |
} | ||
|
||
spdyConn, _ := NewConnection(conn, true) | ||
go spdyConn.Serve(authStreamHandler) | ||
|
||
handler(spdyConn) | ||
} | ||
wg.Done() | ||
}() | ||
|
1 comment
on commit be59d64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a crap commit. may need to run it a few times to see the deadlock. the readers all get stuck. Maybe i'm doing something stupid, please point it out if so.
may need to run this test a few times (sometimes it succeeds) and/or bump up the concurrency to 1000, 10000