Skip to content

Commit

Permalink
complete a stream that has frames in flight if it is canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 17, 2021
1 parent 3154a3a commit 91d6167
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
6 changes: 6 additions & 0 deletions send_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ func (s *sendStream) queueRetransmission(f wire.Frame) {
sf := f.(*wire.StreamFrame)
sf.DataLenPresent = true
s.mutex.Lock()
if s.canceledWrite {
s.mutex.Unlock()
return
}
s.retransmissionQueue = append(s.retransmissionQueue, sf)
s.numOutstandingFrames--
if s.numOutstandingFrames < 0 {
Expand Down Expand Up @@ -413,6 +417,8 @@ func (s *sendStream) cancelWriteImpl(errorCode protocol.ApplicationErrorCode, wr
s.ctxCancel()
s.canceledWrite = true
s.cancelWriteErr = writeErr
s.numOutstandingFrames = 0
s.retransmissionQueue = nil
newlyCompleted := s.isNewlyCompleted()
s.mutex.Unlock()

Expand Down
37 changes: 24 additions & 13 deletions send_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ var _ = Describe("Send Stream", func() {
waitForWrite()
frame, _ := str.popStreamFrame(50)
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
Eventually(writeReturned).Should(BeClosed())
Expect(n).To(BeEquivalentTo(frame.Frame.(*wire.StreamFrame).DataLen()))
Expand All @@ -758,6 +759,7 @@ var _ = Describe("Send Stream", func() {
frame, hasMoreData := str.popStreamFrame(50)
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame, hasMoreData = str.popStreamFrame(10)
Expect(frame).To(BeNil())
Expand All @@ -781,6 +783,7 @@ var _ = Describe("Send Stream", func() {
frame, hasMoreData := str.popStreamFrame(50)
Expect(hasMoreData).To(BeTrue())
Expect(frame).ToNot(BeNil())
mockSender.EXPECT().onStreamCompleted(streamID)
str.CancelWrite(1234)
frame, hasMoreData = str.popStreamFrame(10)
Expect(hasMoreData).To(BeFalse())
Expand Down Expand Up @@ -956,20 +959,28 @@ var _ = Describe("Send Stream", func() {
Expect(newFrame.Frame.(*wire.StreamFrame).Data).To(Equal([]byte("foobar")))
})

It("doesn't get a retransmission after a stream was canceled", func() {
str.numOutstandingFrames = 1
f := &wire.StreamFrame{
Data: []byte("foobar"),
Offset: 0x42,
DataLenPresent: false,
}
It("doesn't queue retransmissions for a stream that was canceled", func() {
mockSender.EXPECT().onHasStreamData(streamID)
str.queueRetransmission(f)
mockSender.EXPECT().queueControlFrame(gomock.Any())
str.CancelWrite(0)
frame, hasMoreData := str.popStreamFrame(protocol.MaxByteCount)
Expect(hasMoreData).To(BeFalse())
Expect(frame).To(BeNil())
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
done := make(chan struct{})
go func() {
defer GinkgoRecover()
_, err := str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
close(done)
}()
waitForWrite()
f, _ := str.popStreamFrame(100)
Expect(f).ToNot(BeNil())
gomock.InOrder(
mockSender.EXPECT().queueControlFrame(gomock.Any()),
mockSender.EXPECT().onStreamCompleted(streamID),
)
str.CancelWrite(9876)
// don't EXPECT any calls to onHasStreamData
f.OnLost(f.Frame)
Expect(str.retransmissionQueue).To(BeEmpty())
})
})

Expand Down

0 comments on commit 91d6167

Please sign in to comment.