Skip to content

Commit

Permalink
Addressing pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamerns committed Apr 30, 2021
1 parent 65dfe4a commit ddec8b0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
24 changes: 15 additions & 9 deletions pkg/middlewares/datachannel/subscriberapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
)

const (
highValue = "high"
mediumValue = "medium"
lowValue = "low"
mutedValue = "none"
highValue = "high"
mediumValue = "medium"
lowValue = "low"
mutedValue = "none"
ActiveLayerMethod = "activeLayer"
)

type setRemoteMedia struct {
Expand Down Expand Up @@ -71,16 +72,21 @@ func transformLayers(layers []string) ([]uint16, error) {

func sendMessage(streamID string, peer *sfu.Peer, layers []string, activeLayer int) {
al, _ := layerIntToStr(activeLayer)
msg := activeLayerMessage{
payload := activeLayerMessage{
StreamID: streamID,
ActiveLayer: al,
AvailableLayers: layers,
}
alMsg := sfu.ChannelAPIMessage{
Method: sfu.ActiveLayerMethod,
Params: msg,
msg := sfu.ChannelAPIMessage{
Method: ActiveLayerMethod,
Params: payload,
}
if err := peer.SendAPIChannelMessage(alMsg); err != nil {
bytes, err := json.Marshal(msg)
if err != nil {
sfu.Logger.Error(err, "unable to marshal active layer message")
}

if err := peer.SendAPIChannelMessage(&bytes); err != nil {
sfu.Logger.Error(err, "unable to send ActiveLayerMessage to peer", "peer_id", peer.ID())
}
}
Expand Down
14 changes: 2 additions & 12 deletions pkg/sfu/peer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sfu

import (
"encoding/json"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -41,11 +40,6 @@ type SessionProvider interface {
GetSession(sid string) (*Session, WebRTCTransportConfig)
}

const (
AudioLevelsMethod = "audioLevels"
ActiveLayerMethod = "activeLayer"
)

type ChannelAPIMessage struct {
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
Expand Down Expand Up @@ -246,17 +240,13 @@ func (p *Peer) Trickle(candidate webrtc.ICECandidateInit, target int) error {
return nil
}

func (p *Peer) SendAPIChannelMessage(msg ChannelAPIMessage) error {
func (p *Peer) SendAPIChannelMessage(msg *[]byte) error {
if p.subscriber == nil {
return fmt.Errorf("No subscriber for this peer")
}
dc := p.subscriber.DataChannel(APIChannelLabel)

bytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("Failed to send message, json error: %v", err)
}
if err = dc.SendText(string(bytes)); err != nil {
if err := dc.SendText(string(*msg)); err != nil {
return fmt.Errorf("Failed to send message: %v", err)
}
return nil
Expand Down
16 changes: 13 additions & 3 deletions pkg/sfu/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sfu

import (
"context"
"encoding/json"
"sync"
"time"

Expand All @@ -23,6 +24,10 @@ type Session struct {
onCloseHandler func()
}

const (
AudioLevelsMethod = "audioLevels"
)

// NewSession creates a new session
func NewSession(id string, bf *buffer.Factory, dcs []*Datachannel, cfg WebRTCTransportConfig) *Session {
s := &Session{
Expand Down Expand Up @@ -209,11 +214,16 @@ func (s *Session) setRelayedDatachannel(peerID string, datachannel *webrtc.DataC
})
}

func (s *Session) BroadcastAPIChannelMessage(msg ChannelAPIMessage, origin string) {
func (s *Session) BroadcastAPIChannelMessage(msg *ChannelAPIMessage, origin string) {
peers := s.Peers()
bytes, err := json.Marshal(*msg)
if err != nil {
Logger.Error(err, "Failed to send message")
return
}
for _, peer := range peers {
if origin != peer.id {
peer.SendAPIChannelMessage(msg)
peer.SendAPIChannelMessage(&bytes)
}
}
}
Expand All @@ -240,7 +250,7 @@ func (s *Session) audioLevelObserver(audioLevelInterval int) {
Method: AudioLevelsMethod,
Params: levels,
}
s.BroadcastAPIChannelMessage(msg, "")
s.BroadcastAPIChannelMessage(&msg, "")
}
}

Expand Down

0 comments on commit ddec8b0

Please sign in to comment.