From 8912b9d4ca5240c5efda7cc4de34f4b0ce71f091 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 22 Oct 2019 03:56:31 -0700 Subject: [PATCH 1/3] fix(stream): avoid taking the receive lock while holding the state lock fixes #7 --- stream.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index 74ffe20..29a5b56 100644 --- a/stream.go +++ b/stream.go @@ -77,22 +77,22 @@ func (s *Stream) Read(b []byte) (n int, err error) { defer asyncNotify(s.recvNotifyCh) START: s.stateLock.Lock() - switch s.state { + state := s.state + s.stateLock.Unlock() + + switch state { case streamRemoteClose: fallthrough case streamClosed: s.recvLock.Lock() - if s.recvBuf.Len() == 0 { - s.recvLock.Unlock() - s.stateLock.Unlock() + empty := s.recvBuf.Len() == 0 + s.recvLock.Unlock() + if empty { return 0, io.EOF } - s.recvLock.Unlock() case streamReset: - s.stateLock.Unlock() return 0, ErrConnectionReset } - s.stateLock.Unlock() // If there is no data available, block s.recvLock.Lock() From 1f87e6ecd405786b46b5f95f5d40c2f474510201 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 22 Oct 2019 03:57:15 -0700 Subject: [PATCH 2/3] chore: really, just avoid holding the state lock longer than we need to --- stream.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/stream.go b/stream.go index 29a5b56..c147fe8 100644 --- a/stream.go +++ b/stream.go @@ -143,17 +143,17 @@ func (s *Stream) write(b []byte) (n int, err error) { START: s.stateLock.Lock() - switch s.state { + state := s.state + s.stateLock.Unlock() + + switch state { case streamLocalClose: fallthrough case streamClosed: - s.stateLock.Unlock() return 0, ErrStreamClosed case streamReset: - s.stateLock.Unlock() return 0, ErrConnectionReset } - s.stateLock.Unlock() // If there is no data available, block window := atomic.LoadUint32(&s.sendWindow) From 62648031afca958c201831f55ba6f70ccc1daca0 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 22 Oct 2019 03:57:54 -0700 Subject: [PATCH 3/3] chore: avoid taking a lock while holding a lock --- stream.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stream.go b/stream.go index c147fe8..5cb9399 100644 --- a/stream.go +++ b/stream.go @@ -208,14 +208,14 @@ func (s *Stream) sendFlags() uint16 { // sendWindowUpdate potentially sends a window update enabling // further writes to take place. Must be invoked with the lock. func (s *Stream) sendWindowUpdate() error { + // Determine the flags if any + flags := s.sendFlags() + // Determine the delta update max := s.session.config.MaxStreamWindowSize s.recvLock.Lock() delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow - // Determine the flags if any - flags := s.sendFlags() - // Check if we can omit the update if delta < (max/2) && flags == 0 { s.recvLock.Unlock()