Skip to content

Commit

Permalink
Move Simulcast SDP Modification into helper
Browse files Browse the repository at this point in the history
Will be used by Simulcast renegotation tests as well

Relates to #1345
  • Loading branch information
Sean-Der committed Aug 30, 2021
1 parent ec3d65e commit a630d62
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 87 deletions.
90 changes: 42 additions & 48 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,21 @@ func TestPeerConnection_Simulcast(t *testing.T) {
var ridMapLock sync.RWMutex
ridMap := map[string]int{}

// Enable Extension Headers needed for Simulcast
m := &MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
for _, extension := range []string{
"urn:ietf:params:rtp-hdrext:sdes:mid",
"urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
} {
if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, RTPCodecTypeVideo); err != nil {
panic(err)
}
}

assertRidCorrect := func(t *testing.T) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
Expand All @@ -1122,22 +1137,13 @@ func TestPeerConnection_Simulcast(t *testing.T) {
return ridCount == 3
}

signalWithModifications := func(t *testing.T, modificationFunc func(string) string) (*PeerConnection, *PeerConnection, *TrackLocalStaticRTP) {
// Enable Extension Headers needed for Simulcast
m := &MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
panic(err)
}
for _, extension := range []string{
"urn:ietf:params:rtp-hdrext:sdes:mid",
"urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id",
"urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id",
} {
if err := m.RegisterHeaderExtension(RTPHeaderExtensionCapability{URI: extension}, RTPCodecTypeVideo); err != nil {
panic(err)
}
}
onTrackHandler := func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
}

t.Run("RTP Extension Based", func(t *testing.T) {
pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
assert.NoError(t, err)

Expand All @@ -1147,43 +1153,17 @@ func TestPeerConnection_Simulcast(t *testing.T) {
_, err = pcOffer.AddTrack(vp8Writer)
assert.NoError(t, err)

pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
ridMapLock.Lock()
defer ridMapLock.Unlock()
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
})

offer, err := pcOffer.CreateOffer(nil)
assert.NoError(t, err)

offerGatheringComplete := GatheringCompletePromise(pcOffer)
assert.NoError(t, pcOffer.SetLocalDescription(offer))
<-offerGatheringComplete

offer.SDP = modificationFunc(pcOffer.LocalDescription().SDP)

assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
ridMap = map[string]int{}
pcAnswer.OnTrack(onTrackHandler)

answer, err := pcAnswer.CreateAnswer(nil)
assert.NoError(t, err)

answerGatheringComplete := GatheringCompletePromise(pcAnswer)
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
<-answerGatheringComplete

assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))

return pcOffer, pcAnswer, vp8Writer
}

t.Run("RTP Extension Based", func(t *testing.T) {
pcOffer, pcAnswer, vp8Writer := signalWithModifications(t, func(sessionDescription string) string {
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) string {
sessionDescription = strings.Split(sessionDescription, "a=end-of-candidates\r\n")[0]
sessionDescription = filterSsrc(sessionDescription)
for _, rid := range rids {
sessionDescription += "a=" + sdpAttributeRid + ":" + rid + " send\r\n"
}
return sessionDescription + "a=simulcast:send " + strings.Join(rids, ";") + "\r\n"
})
}))

for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
Expand All @@ -1208,8 +1188,22 @@ func TestPeerConnection_Simulcast(t *testing.T) {
})

t.Run("SSRC Based", func(t *testing.T) {
pcOffer, pcAnswer, vp8Writer := signalWithModifications(t, func(sessionDescription string) string {
pcOffer, pcAnswer, err := NewAPI(WithMediaEngine(m)).newPair(Configuration{})
assert.NoError(t, err)

vp8Writer, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
assert.NoError(t, err)

_, err = pcOffer.AddTrack(vp8Writer)
assert.NoError(t, err)

ridMap = map[string]int{}
pcAnswer.OnTrack(onTrackHandler)

assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) string {
sessionDescription = strings.Split(sessionDescription, "a=end-of-candidates\r\n")[0]
sessionDescription = filterSsrc(sessionDescription)

for _, rid := range rids {
sessionDescription += "a=" + sdpAttributeRid + ":" + rid + " send\r\n"
}
Expand All @@ -1225,7 +1219,7 @@ a=ssrc-group:FID 5000 5001
a=ssrc-group:FID 5002 5003
a=ssrc-group:FID 5004 5005
`
})
}))

for sequenceNumber := uint16(0); !ridsFullfilled(); sequenceNumber++ {
time.Sleep(20 * time.Millisecond)
Expand Down
10 changes: 8 additions & 2 deletions peerconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newPair() (pcOffer *PeerConnection, pcAnswer *PeerConnection, err error) {
return pca, pcb, nil
}

func signalPair(pcOffer *PeerConnection, pcAnswer *PeerConnection) error {
func signalPairWithModification(pcOffer *PeerConnection, pcAnswer *PeerConnection, modificationFunc func(string) string) error {
// Note(albrow): We need to create a data channel in order to trigger ICE
// candidate gathering in the background for the JavaScript/Wasm bindings. If
// we don't do this, the complete offer including ICE candidates will never be
Expand All @@ -46,7 +46,9 @@ func signalPair(pcOffer *PeerConnection, pcAnswer *PeerConnection) error {
return err
}
<-offerGatheringComplete
if err = pcAnswer.SetRemoteDescription(*pcOffer.LocalDescription()); err != nil {

offer.SDP = modificationFunc(pcOffer.LocalDescription().SDP)
if err = pcAnswer.SetRemoteDescription(offer); err != nil {
return err
}

Expand All @@ -62,6 +64,10 @@ func signalPair(pcOffer *PeerConnection, pcAnswer *PeerConnection) error {
return pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription())
}

func signalPair(pcOffer *PeerConnection, pcAnswer *PeerConnection) error {
return signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) string { return sessionDescription })
}

func offerMediaHasDirection(offer SessionDescription, kind RTPCodecType, direction RTPTransceiverDirection) bool {
parsed := &sdp.SessionDescription{}
if err := parsed.Unmarshal([]byte(offer.SDP)); err != nil {
Expand Down
39 changes: 15 additions & 24 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,40 +127,31 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
}
defer close(r.received)

if len(parameters.Encodings) == 1 && parameters.Encodings[0].SSRC != 0 {
globalParams := r.getParameters()
codec := RTPCodecCapability{}
if len(globalParams.Codecs) != 0 {
codec = globalParams.Codecs[0].RTPCodecCapability
}

for i := range parameters.Encodings {
t := trackStreams{
track: newTrackRemote(
r.kind,
parameters.Encodings[0].SSRC,
parameters.Encodings[0].RID,
parameters.Encodings[i].SSRC,
parameters.Encodings[i].RID,
r,
),
}

globalParams := r.getParameters()
codec := RTPCodecCapability{}
if len(globalParams.Codecs) != 0 {
codec = globalParams.Codecs[0].RTPCodecCapability
}

t.streamInfo = createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, t.streamInfo); err != nil {
return err
if parameters.Encodings[i].SSRC != 0 {
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[i].SSRC, t.streamInfo); err != nil {
return err
}
}

r.tracks = append(r.tracks, t)
} else {
for _, encoding := range parameters.Encodings {
r.tracks = append(r.tracks, trackStreams{
track: newTrackRemote(
r.kind,
encoding.SSRC,
encoding.RID,
r,
),
})
}
}

return nil
Expand Down
34 changes: 21 additions & 13 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func filterTrackWithSSRC(incomingTracks []trackDetails, ssrc SSRC) []trackDetail
}

// extract all trackDetails from an SDP.
func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) []trackDetails { // nolint:gocognit
incomingTracks := []trackDetails{}
rtxRepairFlows := map[uint32]bool{}

func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (incomingTracks []trackDetails) { // nolint:gocognit
for _, media := range s.MediaDescriptions {
tracksInMediaSection := []trackDetails{}
rtxRepairFlows := map[uint32]bool{}

// Plan B can have multiple tracks in a signle media section
streamID := ""
trackID := ""
Expand Down Expand Up @@ -98,7 +98,7 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) [
continue
}
rtxRepairFlows[uint32(rtxRepairFlow)] = true
incomingTracks = filterTrackWithSSRC(incomingTracks, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before
tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before
}
}

Expand Down Expand Up @@ -131,10 +131,10 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) [

isNewTrack := true
trackDetails := &trackDetails{}
for i := range incomingTracks {
for j := range incomingTracks[i].ssrcs {
if incomingTracks[i].ssrcs[j] == SSRC(ssrc) {
trackDetails = &incomingTracks[i]
for i := range tracksInMediaSection {
for j := range tracksInMediaSection[i].ssrcs {
if tracksInMediaSection[i].ssrcs[j] == SSRC(ssrc) {
trackDetails = &tracksInMediaSection[i]
isNewTrack = false
}
}
Expand All @@ -147,26 +147,34 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) [
trackDetails.ssrcs = []SSRC{SSRC(ssrc)}

if isNewTrack {
incomingTracks = append(incomingTracks, *trackDetails)
tracksInMediaSection = append(tracksInMediaSection, *trackDetails)
}
}
}

if rids := getRids(media); len(rids) != 0 && trackID != "" && streamID != "" {
newTrack := trackDetails{
simulcastTrack := trackDetails{
mid: midValue,
kind: codecType,
streamID: streamID,
id: trackID,
rids: []string{},
}
for rid := range rids {
newTrack.rids = append(newTrack.rids, rid)
simulcastTrack.rids = append(simulcastTrack.rids, rid)
}
if len(simulcastTrack.rids) == len(tracksInMediaSection) {
for i := range tracksInMediaSection {
simulcastTrack.ssrcs = append(simulcastTrack.ssrcs, tracksInMediaSection[i].ssrcs...)
}
}

incomingTracks = append(incomingTracks, newTrack)
tracksInMediaSection = []trackDetails{simulcastTrack}
}

incomingTracks = append(incomingTracks, tracksInMediaSection...)
}

return incomingTracks
}

Expand Down

0 comments on commit a630d62

Please sign in to comment.