Skip to content

Commit

Permalink
Improve throuput over network with delay
Browse files Browse the repository at this point in the history
Relates to #62
  • Loading branch information
enobufs committed Oct 7, 2019
1 parent cae91aa commit f99b15c
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 73 deletions.
88 changes: 44 additions & 44 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

const (
receiveMTU uint32 = 8192 // MTU for inbound packet (from DTLS)
initialMTU uint32 = 1228 // initial MTU for outgoing packets (to DTLS)
maxReceiveBufferSize uint32 = 64 * 1024
commonHeaderSize uint32 = 12
dataChunkHeaderSize uint32 = 16
receiveMTU uint32 = 8192 // MTU for inbound packet (from DTLS)
initialMTU uint32 = 1228 // initial MTU for outgoing packets (to DTLS)
initialRecvBufSize uint32 = 1024 * 1024
commonHeaderSize uint32 = 12
dataChunkHeaderSize uint32 = 16
)

// association state enums
Expand Down Expand Up @@ -109,15 +109,14 @@ type Association struct {

netConn net.Conn

peerVerificationTag uint32
myVerificationTag uint32
state uint32
myNextTSN uint32 // nextTSN
peerLastTSN uint32 // lastRcvdTSN
minTSN2MeasureRTT uint32 // for RTT measurement
willRetransmitDataChunks bool
willSendForwardTSN bool
willRetransmitFast bool
peerVerificationTag uint32
myVerificationTag uint32
state uint32
myNextTSN uint32 // nextTSN
peerLastTSN uint32 // lastRcvdTSN
minTSN2MeasureRTT uint32 // for RTT measurement
willSendForwardTSN bool
willRetransmitFast bool

// Reconfig
myNextRSN uint32
Expand All @@ -141,6 +140,7 @@ type Association struct {
useForwardTSN bool

// Congestion control parameters
maxReceiveBufferSize uint32
cwnd uint32 // my congestion window size
rwnd uint32 // calculated peer's receiver windows size
ssthresh uint32 // slow start threshold
Expand Down Expand Up @@ -184,8 +184,9 @@ type Association struct {
// Config collects the arguments to createAssociation construction into
// a single structure
type Config struct {
NetConn net.Conn
LoggerFactory logging.LoggerFactory
NetConn net.Conn
MaxReceiveBufferSize uint32
LoggerFactory logging.LoggerFactory
}

// Server accepts a SCTP stream over a conn
Expand Down Expand Up @@ -224,9 +225,17 @@ func createAssociation(config Config) *Association {
rs := rand.NewSource(time.Now().UnixNano())
r := rand.New(rs)

var maxReceiveBufferSize uint32
if config.MaxReceiveBufferSize == 0 {
maxReceiveBufferSize = initialRecvBufSize
} else {
maxReceiveBufferSize = config.MaxReceiveBufferSize
}

tsn := r.Uint32()
a := &Association{
netConn: config.NetConn,
maxReceiveBufferSize: maxReceiveBufferSize,
myMaxNumOutboundStreams: math.MaxUint16,
myMaxNumInboundStreams: math.MaxUint16,
payloadQueue: newPayloadQueue(),
Expand Down Expand Up @@ -288,7 +297,7 @@ func (a *Association) init(isClient bool) {
init.numOutboundStreams = a.myMaxNumOutboundStreams
init.numInboundStreams = a.myMaxNumInboundStreams
init.initiateTag = a.myVerificationTag
init.advertisedReceiverWindowCredit = maxReceiveBufferSize
init.advertisedReceiverWindowCredit = a.maxReceiveBufferSize
setSupportedExtensions(&init.chunkInitCommon)
a.storedInit = init

Expand Down Expand Up @@ -507,17 +516,13 @@ func (a *Association) gatherOutbound() [][]byte {
state := a.getState()

if state == established {
if a.willRetransmitDataChunks {
a.willRetransmitDataChunks = false
for _, p := range a.getDataPacketsToRetransmit() {
raw, err := p.marshal()
if err != nil {
a.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", a.name)
continue
}
a.log.Debugf("[%s] retransmitting %d bytes", a.name, len(raw))
rawPackets = append(rawPackets, raw)
for _, p := range a.getDataPacketsToRetransmit() {
raw, err := p.marshal()
if err != nil {
a.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", a.name)
continue
}
rawPackets = append(rawPackets, raw)
}

// Pop unsent data chunks from the pending queue to send as much as
Expand Down Expand Up @@ -789,7 +794,7 @@ func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
initAck.numOutboundStreams = a.myMaxNumOutboundStreams
initAck.numInboundStreams = a.myMaxNumInboundStreams
initAck.initiateTag = a.myVerificationTag
initAck.advertisedReceiverWindowCredit = maxReceiveBufferSize
initAck.advertisedReceiverWindowCredit = a.maxReceiveBufferSize

if a.myCookie == nil {
a.myCookie = newRandomStateCookie()
Expand Down Expand Up @@ -1021,10 +1026,10 @@ func (a *Association) getMyReceiverWindowCredit() uint32 {
bytesQueued += uint32(s.getNumBytesInReassemblyQueue())
}

if bytesQueued >= maxReceiveBufferSize {
if bytesQueued >= a.maxReceiveBufferSize {
return 0
}
return maxReceiveBufferSize - bytesQueued
return a.maxReceiveBufferSize - bytesQueued
}

// OpenStream opens a stream
Expand Down Expand Up @@ -1222,8 +1227,8 @@ func (a *Association) onCumulativeTSNAckPointAdvanced(totalBytesAcked int) {
// path MTU.
if !a.inFastRecovery &&
a.pendingQueue.size() > 0 {
//a.cwnd += min32(uint32(totalBytesAcked), a.cwnd)
a.cwnd += min32(uint32(totalBytesAcked), a.mtu)
a.cwnd += min32(uint32(totalBytesAcked), a.cwnd) // TCP way
//a.cwnd += min32(uint32(totalBytesAcked), a.mtu) // SCTP way (slow)
a.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)",
a.name, a.cwnd, a.ssthresh, totalBytesAcked)
} else {
Expand Down Expand Up @@ -1687,12 +1692,9 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
// is 0), the data sender can always have one DATA chunk in flight to
// the receiver if allowed by cwnd (see rule B, below).

usingFullWindow := (a.inflightQueue.getNumBytes() == 0)

for {
c := a.pendingQueue.peek()
if c == nil {
usingFullWindow = false
break // no more pending data
}

Expand Down Expand Up @@ -1742,15 +1744,10 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
// min(cwnd,rnwd) would end up with generating SACK, on the remote, for
// every single DATA chunks when sending large amount of data at once.
// In order to overcome this situation, here we set I-bit to true for
// the last chunk only when the following condition is met:
//
// - When sending full size of windlow (= min(cwnd, rwnd))
// the last chunk only.

lastChunk := chunks[len(chunks)-1]
if usingFullWindow {
//a.log.Debugf("sending tsn=%d with immediateSack: total_chunks=%d", lastChunk.tsn, len(chunks))
lastChunk.immediateSack = true
}
lastChunk.immediateSack = true
}
}

Expand Down Expand Up @@ -1849,7 +1846,7 @@ func (a *Association) getDataPacketsToRetransmit() []*packet {
break // end of pending data
}

if c.acked || c.abandoned {
if !c.retransmit {
continue
}

Expand All @@ -1860,6 +1857,9 @@ func (a *Association) getDataPacketsToRetransmit() []*packet {
break
}

// reset the retransmit flag not to retransmit again before the next
// t3-rtx timer fires
c.retransmit = false
bytesToSend += len(c.userData)

c.nSent++
Expand Down Expand Up @@ -2013,7 +2013,7 @@ func (a *Association) onRetransmissionTimeout(id int, nRtos uint) {

a.log.Debugf("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d", a.name, nRtos, a.cwnd, a.ssthresh)

a.willRetransmitDataChunks = true
a.inflightQueue.markAllToRetrasmit()
a.awakeWriteLoop()

return
Expand Down
Loading

0 comments on commit f99b15c

Please sign in to comment.