Skip to content

Commit

Permalink
Improve handling of SACK
Browse files Browse the repository at this point in the history
Relates to #62
  • Loading branch information
enobufs committed Oct 21, 2019
1 parent d1633f9 commit c7cc441
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 36 deletions.
2 changes: 1 addition & 1 deletion ack_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *ackTimer) start() bool {
return false
}

// this is a noop if the timer is always running
// this is a noop if the timer is already running
if t.stopFunc != nil {
return false
}
Expand Down
72 changes: 43 additions & 29 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ const (

// ack transmission state
const (
ackStateIdle int = iota
ackStateImmediate
ackStateDelay
ackStateIdle int = iota // ack timer is off
ackStateImmediate // ack timer is on (ack is being delayed)
ackStateDelay // will send ack immediately
)

// other constants
Expand Down Expand Up @@ -177,6 +177,10 @@ type Association struct {
// stats
stats *associationStats

// per inbound packet context
delayedAckTriggered bool
immediateAckTriggered bool

name string
log logging.LeveledLogger
}
Expand Down Expand Up @@ -485,13 +489,17 @@ func (a *Association) handleInbound(raw []byte) error {
return errors.Wrap(err, "failed validating packet")
}

a.handleChunkStart(p)

for _, c := range p.chunks {
err := a.handleChunk(p, c)
if err != nil {
return errors.Wrap(err, "failed handling chunk")
a.log.Warnf("[%s] %s", a.name, errors.Wrap(err, "failed handling chunk").Error())
}
}

a.handleChunkEnd(p)

return nil
}

Expand Down Expand Up @@ -622,6 +630,7 @@ func (a *Association) gatherOutbound() [][]byte {
if a.ackState == ackStateImmediate {
a.ackState = ackStateIdle
sack := a.createSelectiveAckChunk()
a.log.Debugf("[%s] sending SACK: %s", a.name, sack.String())
raw, err := a.createPacket([]chunk{sack}).marshal()
if err != nil {
a.log.Warnf("[%s] failed to serialize a SACK packet", a.name)
Expand Down Expand Up @@ -1006,14 +1015,13 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {
}

if (a.ackState != ackStateImmediate && !d.immediateSack && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay {
// Will send delayed ack in the next ack timeout
a.ackState = ackStateDelay
a.ackTimer.start()
if a.ackState == ackStateIdle {
a.delayedAckTriggered = true
} else {
a.immediateAckTriggered = true
}
} else {
// Send SACK now!
a.ackState = ackStateImmediate
a.ackTimer.stop()
a.awakeWriteLoop()
a.immediateAckTriggered = true
}

return reply
Expand Down Expand Up @@ -1731,24 +1739,6 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
chunks = append(chunks, c)
}
}

// Controlling DATA chunk's I-bit (SACK-IMMEDIATELY flag)
// RFC 7053 sec 4.2. Triggering at the SCTP Level
// Another case is where the sending of a DATA chunk fills the
// congestion or receiver window. Setting the I bit in these cases
// improves the throughput of the transfer.

if len(chunks) > 0 {
// IMPLETEMATION NOTE:
// It was learned that setting I-bit to the every last chunk that fills
// min(cwnd,rnwd) would end up with generating SACK, on the remote, for
// every single DATA chunks when sending large amount of data at once.
// In order to overcome this situation, here we set I-bit to true for
// the last chunk only.

lastChunk := chunks[len(chunks)-1]
lastChunk.immediateSack = true
}
}

return chunks, sisToReset
Expand Down Expand Up @@ -1903,6 +1893,30 @@ func pack(p *packet) []*packet {
return []*packet{p}
}

func (a *Association) handleChunkStart(p *packet) {
a.lock.Lock()
defer a.lock.Unlock()

a.delayedAckTriggered = false
a.immediateAckTriggered = false
}

func (a *Association) handleChunkEnd(p *packet) {
a.lock.Lock()
defer a.lock.Unlock()

if a.immediateAckTriggered {
// Send SACK now!
a.ackState = ackStateImmediate
a.ackTimer.stop()
a.awakeWriteLoop()
} else if a.delayedAckTriggered {
// Will send delayed ack in the next ack timeout
a.ackState = ackStateDelay
a.ackTimer.start()
}
}

func (a *Association) handleChunk(p *packet, c chunk) error {
a.lock.Lock()
defer a.lock.Unlock()
Expand Down
87 changes: 83 additions & 4 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,7 @@ func TestAssocCongestionControl(t *testing.T) {
t.Logf("nT3Timeouts : %d\n", a0.stats.getNumT3Timeouts())

assert.Equal(t, uint64(nPacketsToSend), a1.stats.getNumDATAs(), "packet count mismatch")
assert.True(t, a0.stats.getNumSACKs() < nPacketsToSend/10, "too many sacks")
assert.True(t, a0.stats.getNumSACKs() <= nPacketsToSend/2, "too many sacks")
assert.Equal(t, uint64(0), a0.stats.getNumT3Timeouts(), "should be no retransmit")

closeAssociationPair(br, a0, a1)
Expand Down Expand Up @@ -1882,7 +1882,86 @@ func TestAssocCongestionControl(t *testing.T) {
}

func TestAssocDelayedAck(t *testing.T) {
t.Run("Ack all DATA chunks with one SACK", func(t *testing.T) {
t.Run("First DATA chunk gets acked with delay", func(t *testing.T) {
const si uint16 = 6
var n int
var nPacketsReceived int
var ppi PayloadProtocolIdentifier
sbuf := make([]byte, 1000) // size should be less than initial cwnd (4380)
rbuf := make([]byte, 1500)

_, err := cryptoRand.Read(sbuf)
if !assert.Nil(t, err, "failed to create associations") {
return
}

br := test.NewBridge()

a0, a1, err := createNewAssociationPair(br, ackModeAlwaysDelay, 0)
if !assert.Nil(t, err, "failed to create associations") {
assert.FailNow(t, "failed due to earlier error")
}

s0, s1, err := establishSessionPair(br, a0, a1, si)
assert.Nil(t, err, "failed to establish session pair")

a0.stats.reset()
a1.stats.reset()

// Writes data (will fragmented)
n, err = s0.WriteSCTP(sbuf, PayloadTypeWebRTCBinary)
assert.Nil(t, err, "WriteSCTP failed")
assert.Equal(t, n, len(sbuf), "unexpected length of received data")

// Repeat calling br.Tick() until the buffered amount becomes 0
since := time.Now()
for s0.BufferedAmount() > 0 {
for {
n = br.Tick()
if n == 0 {
break
}
}

for {
s1.lock.RLock()
readable := s1.reassemblyQueue.isReadable()
s1.lock.RUnlock()
if !readable {
break
}
n, ppi, err = s1.ReadSCTP(rbuf)
if !assert.Nil(t, err, "ReadSCTP failed") {
return
}
assert.Equal(t, len(sbuf), n, "unexpected length of received data")
assert.Equal(t, ppi, PayloadTypeWebRTCBinary, "unexpected ppi")

nPacketsReceived++
}
}
delay := time.Since(since).Seconds()
t.Logf("received in %.03f seconds", delay)
assert.True(t, delay >= 0.2, "should be >= 200msec")

br.Process()

assert.Equal(t, 1, nPacketsReceived, "should be one packet received")
assert.Equal(t, 0, s1.getNumBytesInReassemblyQueue(), "reassembly queue should be empty")

t.Logf("nDATAs : %d\n", a1.stats.getNumDATAs())
t.Logf("nSACKs : %d\n", a0.stats.getNumSACKs())
t.Logf("nAckTimeouts: %d\n", a1.stats.getNumAckTimeouts())

assert.Equal(t, uint64(1), a1.stats.getNumDATAs(), "DATA chunk count mismatch")
assert.Equal(t, a0.stats.getNumSACKs(), a1.stats.getNumDATAs(), "sack count should be equal to the number of data chunks")
assert.Equal(t, uint64(1), a1.stats.getNumAckTimeouts(), "ackTimeout count mismatch")
assert.Equal(t, uint64(0), a0.stats.getNumT3Timeouts(), "should be no retransmit")

closeAssociationPair(br, a0, a1)
})

t.Run("Second DATA chunk to generate SACK immedidately", func(t *testing.T) {
const si uint16 = 6
var n int
var nPacketsReceived int
Expand Down Expand Up @@ -1950,8 +2029,8 @@ func TestAssocDelayedAck(t *testing.T) {
t.Logf("nAckTimeouts: %d\n", a1.stats.getNumAckTimeouts())

assert.Equal(t, uint64(4), a1.stats.getNumDATAs(), "DATA chunk count mismatch")
assert.Equal(t, uint64(1), a0.stats.getNumSACKs(), "sack count should be one")
assert.Equal(t, uint64(1), a1.stats.getNumAckTimeouts(), "ackTimeout count mismatch")
assert.True(t, a0.stats.getNumSACKs() < a1.stats.getNumDATAs(), "sack count should less than data")
assert.Equal(t, uint64(0), a1.stats.getNumAckTimeouts(), "ackTimeout count mismatch")
assert.Equal(t, uint64(0), a0.stats.getNumT3Timeouts(), "should be no retransmit")

closeAssociationPair(br, a0, a1)
Expand Down
7 changes: 5 additions & 2 deletions chunk_selective_ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ func (s *chunkSelectiveAck) check() (abort bool, err error) {

// String makes chunkSelectiveAck printable
func (s *chunkSelectiveAck) String() string {
res := fmt.Sprintf("%s\n%d", s.chunkHeader, s.cumulativeTSNAck)
res := fmt.Sprintf("SACK cumTsnAck=%d arwnd=%d dupTsn=%d",
s.cumulativeTSNAck,
s.advertisedReceiverWindowCredit,
s.duplicateTSN)

for _, gap := range s.gapAckBlocks {
res = fmt.Sprintf("\n gap ack: %s", gap)
res = fmt.Sprintf("%s\n gap ack: %s", res, gap)
}

return res
Expand Down

0 comments on commit c7cc441

Please sign in to comment.