Skip to content

Commit

Permalink
Add stream handler function on Listen
Browse files Browse the repository at this point in the history
Stream handler allows greater control of the stream acceptance process. This allows the listener to also control the headers it returns on stream creation.

Signed-off-by: Derek McGowan <[email protected]> (github: dmcgowan)
  • Loading branch information
dmcgowan committed Sep 15, 2015
1 parent c3e5648 commit f02a9e7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
47 changes: 36 additions & 11 deletions spdy/spdy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type spdyStream struct {

type spdyStreamListener struct {
listenChan <-chan *spdyStream
handler streams.StreamHandler
}

type spdyStreamProvider struct {
Expand Down Expand Up @@ -54,16 +55,14 @@ func (p *spdyStreamProvider) newStreamHandler(stream *spdystream.Stream) {
s := &spdyStream{
stream: stream,
}
returnHeaders := http.Header{}
var finish bool

select {
case <-p.closeChan:
returnHeaders.Set(":status", "502")
finish = true
// Do not set handlers on closed providers, new streams
// should not be handled by the provider
stream.SendReply(http.Header{}, true)
case p.listenChan <- s:
returnHeaders.Set(":status", "200")
}
stream.SendReply(returnHeaders, finish)
}

func (p *spdyStreamProvider) NewStream(headers http.Header) (streams.Stream, error) {
Expand All @@ -79,18 +78,44 @@ func (p *spdyStreamProvider) Close() error {
return p.conn.Close()
}

func (p *spdyStreamProvider) Listen() streams.Listener {
func nilHandler(http.Header) (http.Header, bool, error) {
return http.Header{}, true, nil
}

func (p *spdyStreamProvider) Listen(handler streams.StreamHandler) streams.Listener {
if handler == nil {
handler = nilHandler
}
return &spdyStreamListener{
listenChan: p.listenChan,
handler: handler,
}
}

func (l *spdyStreamListener) Accept() (streams.Stream, error) {
stream := <-l.listenChan
if stream == nil {
return nil, io.EOF
for {
s := <-l.listenChan
if s == nil {
return nil, io.EOF
}
replyHeaders, accept, err := l.handler(s.Headers())
if err != nil {
// Send reply but only return original handler error
s.stream.SendReply(replyHeaders, !accept)
return nil, err
}
if !accept {
if err := s.stream.SendReply(replyHeaders, true); err != nil {
return nil, err
}
continue
} else {
if err := s.stream.SendReply(replyHeaders, false); err != nil {
return nil, err
}
}
return s, nil
}
return stream, nil
}

func (s *spdyStream) Read(b []byte) (int, error) {
Expand Down
10 changes: 9 additions & 1 deletion streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,13 @@ type Listener interface {
type StreamProvider interface {
NewStream(http.Header) (Stream, error)
Close() error
Listen() Listener
Listen(StreamHandler) Listener
}

// StreamHandler is a function to handle a new stream
// by adding response headers and whether the stream
// should be accepted. If a stream is not accepted, it
// will not be returned by an accept on the listener.
// Any error returned by the handler should be returned
// on accept.
type StreamHandler func(http.Header) (http.Header, bool, error)

0 comments on commit f02a9e7

Please sign in to comment.