Skip to content
This repository was archived by the owner on Dec 2, 2023. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
improve.
Browse files Browse the repository at this point in the history
cloudwebrtc committed Mar 11, 2021
1 parent 1aa9caf commit b36eb9a
Showing 23 changed files with 835 additions and 829 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -12,10 +12,12 @@ require (
github.com/improbable-eng/grpc-web v0.13.0
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
github.com/notedit/sdp v0.0.4
github.com/pion/ion-avp v1.8.1
github.com/pion/ion-log v1.0.0
github.com/pion/ion-sfu v1.9.3
github.com/pion/webrtc/v3 v3.0.10
github.com/pixelbender/go-sdp v1.1.0
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -14,6 +14,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Jeffail/gabs v1.1.1 h1:V0uzR08Hj22EX8+8QMhyI9sX2hwRu+/RJhJUmnwda/E=
github.com/Jeffail/gabs v1.1.1/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@@ -270,6 +272,8 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/notedit/sdp v0.0.4 h1:P4L8HbZ8SfzrRDE2m3zPnkHhcSdr/0sZkapKo0lyDJs=
github.com/notedit/sdp v0.0.4/go.mod h1:v7SdJxYpW6sY8RhA2KX14mmIHXKC0Kl/XrEQwaQJ7lM=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
@@ -355,6 +359,8 @@ github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths
github.com/pion/webrtc/v3 v3.0.4/go.mod h1:1TmFSLpPYFTFXFHPtoq9eGP1ASTa9LC6FBh7sUY8cd4=
github.com/pion/webrtc/v3 v3.0.10 h1:hti6k0DeN4tbQmAZiA8v6OvdkANQGw+R3nyqk9+dnz0=
github.com/pion/webrtc/v3 v3.0.10/go.mod h1:KdEZWLmBnxB2Qj4FtUb9vi1sIpqsHOisI7L6ggQBD0A=
github.com/pixelbender/go-sdp v1.1.0 h1:rkm9aFBNKrnB+YGfhLmAkal3pC8XYXb9h+172PlrCBU=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
68 changes: 36 additions & 32 deletions pkg/grpc/avp/avp.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 66 additions & 82 deletions pkg/grpc/biz/biz.pb.go
582 changes: 265 additions & 317 deletions pkg/grpc/ion/ion.pb.go

Large diffs are not rendered by default.

201 changes: 117 additions & 84 deletions pkg/grpc/islb/islb.pb.go
58 changes: 29 additions & 29 deletions pkg/grpc/islb/islb_grpc.pb.go
51 changes: 48 additions & 3 deletions pkg/ion/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ion

import (
"sync"

nd "github.com/cloudwebrtc/nats-discovery/pkg/client"
"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
"github.com/cloudwebrtc/nats-grpc/pkg/rpc"
@@ -10,6 +12,7 @@ import (
"google.golang.org/grpc"
)

//Node .
type Node struct {
// Node ID
NID string
@@ -19,8 +22,21 @@ type Node struct {
nrpc *rpc.Server
// Service discovery client
nd *nd.Client

nodeLock sync.RWMutex
//neighbor nodes
neighborNodes map[string]*discovery.Node
}

//NewNode .
func NewNode(nid string) Node {
return Node{
NID: nid,
neighborNodes: make(map[string]*discovery.Node),
}
}

//Start .
func (n *Node) Start(natURL string) error {
var err error
n.nc, err = util.NewNatsConn(natURL)
@@ -39,31 +55,60 @@ func (n *Node) Start(natURL string) error {
return nil
}

//NatsConn .
func (n *Node) NatsConn() *nats.Conn {
return n.nc
}

//KeepAlive Upload your node info to registry.
func (n *Node) KeepAlive(node discovery.Node) error {
return n.nd.KeepAlive(node)
}

func (n *Node) Watch(service string, onStateChange nd.NodeStateChangeCallback) error {
//Watch the neighbor nodes
func (n *Node) Watch(service string) error {
resp, err := n.nd.Get(service)
if err != nil {
log.Errorf("Watch service %v error %v", service, err)
return err
}
for _, node := range resp.Nodes {
onStateChange(discovery.NodeUp, &node)
n.handleNeighborNodes(discovery.NodeUp, &node)
}

return n.nd.Watch(service, onStateChange)
return n.nd.Watch(service, n.handleNeighborNodes)
}

// GetNeighborNodes get neighbor nodes.
func (n *Node) GetNeighborNodes() map[string]*discovery.Node {
n.nodeLock.Lock()
defer n.nodeLock.Unlock()
return n.neighborNodes
}

// handleNeighborNodes handle nodes up/down
func (n *Node) handleNeighborNodes(state discovery.NodeState, node *discovery.Node) {
n.nodeLock.Lock()
defer n.nodeLock.Unlock()
id := node.NID
service := node.Service
if state == discovery.NodeUp {
log.Infof("Service up: "+service+" node id => [%v], rpc => %v", id, node.RPC.Protocol)
if _, found := n.neighborNodes[id]; !found {
n.neighborNodes[id] = node
}
} else if state == discovery.NodeDown {
log.Infof("Service down: "+service+" node id => [%v]", id)
delete(n.neighborNodes, id)
}
}

//ServiceRegistrar return grpc.ServiceRegistrar of this node, used to create grpc services
func (n *Node) ServiceRegistrar() grpc.ServiceRegistrar {
return n.nrpc
}

//Close .
func (n *Node) Close() {
if n.nrpc != nil {
n.nrpc.Stop()
4 changes: 2 additions & 2 deletions pkg/node/avp/avp.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ type AVP struct {

// NewAVP create a avp node instance
func NewAVP(nid string) *AVP {
return &AVP{Node: ion.Node{NID: nid}}
return &AVP{Node: ion.NewNode(nid)}
}

// Start avp node
@@ -106,7 +106,7 @@ func (a *AVP) Start(conf Config) error {
pb.RegisterAVPServer(a.Node.ServiceRegistrar(), a.s)

//Watch ISLB nodes.
go a.Node.Watch(proto.ServiceISLB, a.s.watchIslbNodes)
go a.Node.Watch(proto.ServiceISLB)

return nil
}
6 changes: 3 additions & 3 deletions pkg/node/biz/biz.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ type BIZ struct {
// NewBIZ create a biz node instance
func NewBIZ(nid string) *BIZ {
return &BIZ{
Node: ion.Node{NID: nid},
Node: ion.NewNode(nid),
}
}

@@ -95,10 +95,10 @@ func (b *BIZ) Start(conf Config) error {

go b.Node.KeepAlive(node)

s := newBizServer(conf.Global.Dc, b.NID, conf.Avp.Elements, b.NatsConn())
s := newBizServer(b, conf.Global.Dc, b.NID, conf.Avp.Elements, b.NatsConn())

//Watch ISLB nodes.
go b.Node.Watch(proto.ServiceISLB, s.watchNodes)
go b.Node.Watch(proto.ServiceISLB)

b.s = s

28 changes: 15 additions & 13 deletions pkg/node/biz/biz_test.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
"github.com/nats-io/nats.go"
log "github.com/pion/ion-log"
pb "github.com/pion/ion/pkg/grpc/biz"
@@ -51,18 +50,18 @@ func init() {

s := grpc.NewServer()

bs = newBizServer(dc, nid, []string{}, nc)
bn := NewBIZ(nid)

bs.nodes["islb00"] = &discovery.Node{
Service: proto.ServiceISLB,
NID: "islb00",
DC: "dc1",
RPC: discovery.RPC{
Protocol: discovery.NGRPC,
Addr: natsURL,
},
err = bn.Node.Start(natsURL)
if err != nil {
log.Panicf("failed to start biz node: %v", err)
}

bs = newBizServer(bn, dc, nid, []string{}, nc)

//Watch ISLB nodes.
go bn.Node.Watch(proto.ServiceISLB)

pb.RegisterBizServer(s, bs)

go func() {
@@ -94,8 +93,10 @@ func TestJBizJoin(t *testing.T) {
stream.Send(&pb.SignalRequest{
Payload: &pb.SignalRequest_Join{
Join: &pb.Join{
Sid: sid,
Uid: uid,
Peer: &ion.Peer{
Sid: sid,
Uid: uid,
},
},
},
})
@@ -105,13 +106,14 @@ func TestJBizJoin(t *testing.T) {
t.Error(err)
}

log.Infof("join reply %v", reply)

r := bs.getRoom(sid)
assert.EqualValues(t, sid, r.sid)

p := r.getPeer(uid)
assert.EqualValues(t, uid, p.UID())

log.Infof("join reply %v", reply)
log.Infof("TestJoin done")
}

36 changes: 28 additions & 8 deletions pkg/node/biz/room.go
Original file line number Diff line number Diff line change
@@ -11,15 +11,15 @@ import (
type Room struct {
sync.RWMutex
sid string
sfuNID string
sfunid string
peers map[string]*Peer
}

// newRoom creates a new room instance
func newRoom(sid string, nid string) *Room {
func newRoom(sid string, sfunid string) *Room {
r := &Room{
sid: sid,
sfuNID: nid,
sfunid: sfunid,
peers: make(map[string]*Peer),
}
return r
@@ -33,8 +33,18 @@ func (r *Room) SID() string {
// addPeer add a peer to room
func (r *Room) addPeer(p *Peer) {
r.Lock()
defer r.Unlock()
r.peers[p.uid] = p
r.Unlock()

event := &ion.PeerEvent{
State: ion.PeerEvent_JOIN,
Peer: &ion.Peer{
Sid: r.sid,
Uid: p.uid,
},
}

r.sendPeerEvent(event)
}

// getPeer get a peer by peer id
@@ -54,8 +64,18 @@ func (r *Room) getPeers() map[string]*Peer {
// delPeer delete a peer in the room
func (r *Room) delPeer(uid string) int {
r.Lock()
defer r.Unlock()
delete(r.peers, uid)
r.Unlock()

event := &ion.PeerEvent{
State: ion.PeerEvent_LEAVE,
Peer: &ion.Peer{
Sid: r.sid,
Uid: uid,
},
}
r.sendPeerEvent(event)

return len(r.peers)
}

@@ -66,7 +86,7 @@ func (r *Room) count() int {
return len(r.peers)
}

func (r *Room) broadcastPeerEvent(event *ion.PeerEvent) {
func (r *Room) sendPeerEvent(event *ion.PeerEvent) {
peers := r.getPeers()
for _, p := range peers {
if err := p.sendPeerEvent(event); err != nil {
@@ -75,7 +95,7 @@ func (r *Room) broadcastPeerEvent(event *ion.PeerEvent) {
}
}

func (r *Room) broadcastStreamEvent(event *ion.StreamEvent) {
func (r *Room) sendStreamEvent(event *ion.StreamEvent) {
peers := r.getPeers()
for _, p := range peers {
if err := p.sendStreamEvent(event); err != nil {
@@ -84,7 +104,7 @@ func (r *Room) broadcastStreamEvent(event *ion.StreamEvent) {
}
}

func (r *Room) broadcastMessage(msg *ion.Message) {
func (r *Room) sendMessage(msg *ion.Message) {
from := msg.From
to := msg.To
data := msg.Data
66 changes: 12 additions & 54 deletions pkg/node/biz/server.go
Original file line number Diff line number Diff line change
@@ -7,12 +7,10 @@ import (
"sync"
"time"

"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
nrpc "github.com/cloudwebrtc/nats-grpc/pkg/rpc"
"github.com/nats-io/nats.go"
log "github.com/pion/ion-log"
biz "github.com/pion/ion/pkg/grpc/biz"
"github.com/pion/ion/pkg/grpc/ion"
islb "github.com/pion/ion/pkg/grpc/islb"
"github.com/pion/ion/pkg/proto"
"github.com/pion/ion/pkg/util"
@@ -27,18 +25,17 @@ type BizServer struct {
rooms map[string]*Room
closed chan bool
islbcli islb.ISLBClient
nodeLock sync.RWMutex
nodes map[string]*discovery.Node
bn *BIZ
}

// newBizServer creates a new avp server instance
func newBizServer(dc string, nid string, elements []string, nc *nats.Conn) *BizServer {
func newBizServer(bn *BIZ, c string, nid string, elements []string, nc *nats.Conn) *BizServer {
return &BizServer{
bn: bn,
nc: nc,
elements: elements,
rooms: make(map[string]*Room),
closed: make(chan bool),
nodes: make(map[string]*discovery.Node),
}
}

@@ -151,16 +148,15 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {

switch payload := req.Payload.(type) {
case *biz.SignalRequest_Join:
sid := payload.Join.Sid
uid := payload.Join.Uid
sid := payload.Join.Peer.Sid
uid := payload.Join.Peer.Uid

success := false
reason := "unkown error."

if s.islbcli == nil {
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
for _, node := range s.nodes {
nodes := s.bn.GetNeighborNodes()
for _, node := range nodes {
if node.Service == proto.ServiceISLB {
ncli := nrpc.NewClient(s.nc, node.NID)
s.islbcli = islb.NewISLBClient(ncli)
@@ -172,34 +168,23 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
if s.islbcli != nil {
r = s.getRoom(sid)
if r == nil {
reason = fmt.Sprintf("room sid = %v not found", sid)
resp, err := s.islbcli.FindNode(context.TODO(), &islb.FindNodeRequest{
Service: proto.ServiceSFU,
Sid: sid,
})

if err == nil && len(resp.Nodes) > 0 {
r = s.createRoom(sid, resp.GetNodes()[0].Nid)
} else {
reason = fmt.Sprintf("islbcli.FindNode(serivce = sfu, sid = %v) err %v", sid, err)
}
}
if r != nil {
peer = NewPeer(sid, uid, payload.Join.Info, repCh)
peer = NewPeer(sid, uid, payload.Join.Peer.Info, repCh)
r.addPeer(peer)

peerEvent := &ion.PeerEvent{
State: ion.PeerEvent_JOIN,
Peer: &ion.Peer{
Uid: uid,
//TODO: Parse the sdp to get the stream parameter set.
Streams: []*ion.Stream{},
},
}
r.broadcastPeerEvent(peerEvent)

success = true
reason = "join success."
} else {
reason = fmt.Sprintf("room sid = %v not found", sid)
}
} else {
reason = fmt.Sprintf("islb node not found")
@@ -221,16 +206,6 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
peer.Close()
peer = nil

peerEvent := &ion.PeerEvent{
State: ion.PeerEvent_LEAVE,
Peer: &ion.Peer{
Uid: uid,
//TODO: Parse the sdp to get the stream parameter set.
Streams: []*ion.Stream{},
},
}
r.broadcastPeerEvent(peerEvent)

if r.count() == 0 {
s.delRoom(r.SID())
r = nil
@@ -247,9 +222,9 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
to := payload.Msg.To
data := payload.Msg.Data
log.Debugf("Msg request %v => %v, data: %v", from, to, data)

// message broadcast
r.broadcastMessage(payload.Msg)
r.sendMessage(payload.Msg)
default:
break
}
@@ -258,23 +233,6 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
}
}

// watchNodes watch islb nodes up/down
func (s *BizServer) watchNodes(state discovery.NodeState, node *discovery.Node) {
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
id := node.NID
service := node.Service
if state == discovery.NodeUp {
log.Infof("Service up: "+service+" node id => [%v], rpc => %v", id, node.RPC.Protocol)
if _, found := s.nodes[id]; !found {
s.nodes[id] = node
}
} else if state == discovery.NodeDown {
log.Infof("Service down: "+service+" node id => [%v]", id)
delete(s.nodes, id)
}
}

// stat peers
func (s *BizServer) stat() {
t := time.NewTicker(util.DefaultStatCycle)
2 changes: 1 addition & 1 deletion pkg/node/biz/sfu.go
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ func (s *SFUSignalBridge) Signal(sstream sfu.SFU_SignalServer) error {
if peer != nil {
// Use nats-grpc or grpc
// TODO: change to util.NewGRPCClientConnForNode.
cli := sfu.NewSFUClient(nrpc.NewClient(s.BizServer.nc, r.sfuNID))
cli := sfu.NewSFUClient(nrpc.NewClient(s.BizServer.nc, r.sfunid))
var err error
cstream, err = cli.Signal(context.Background())
if err != nil {
6 changes: 3 additions & 3 deletions pkg/node/islb/islb.go
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ type ISLB struct {

// NewISLB create a islb node instance
func NewISLB(nid string) *ISLB {
return &ISLB{Node: ion.Node{NID: nid}}
return &ISLB{Node: ion.NewNode(nid)}
}

// Start islb node
@@ -84,10 +84,10 @@ func (i *ISLB) Start(conf Config) error {
return errors.New("new redis error")
}

i.s = &islbServer{Redis: i.redis, nodes: make(map[string]discovery.Node)}
i.s = newISLBServer(i, i.redis)
pb.RegisterISLBServer(i.Node.ServiceRegistrar(), i.s)

i.registry.Listen(i.s.watchAllNodes)
i.registry.Listen(i.s.handleNodeDiscovery)

node := discovery.Node{
DC: conf.Global.Dc,
153 changes: 27 additions & 126 deletions pkg/node/islb/server.go
Original file line number Diff line number Diff line change
@@ -18,11 +18,23 @@ type islbServer struct {
Redis *db.Redis
nodeLock sync.Mutex
nodes map[string]discovery.Node
in *ISLB
}

// handle Node from service discovery.
func (s *islbServer) watchAllNodes(action string, node discovery.Node) {
log.Debugf("handleNode:service %v, action %v => id %v, RPC %v", node.Service, action, node.ID(), node.RPC)
func newISLBServer(in *ISLB, redis *db.Redis) *islbServer {
return &islbServer{
in: in,
Redis: redis,
nodes: make(map[string]discovery.Node),
}
}

// handleNodeDiscovery handle all Node from service discovery.
// This callback can observe all nodes in the ion cluster,
// TODO: Upload all node information to redis DB so that info
// can be shared when there are more than one ISLB in the later.
func (s *islbServer) handleNodeDiscovery(action string, node discovery.Node) {
log.Debugf("handleNode: service %v, action %v => id %v, RPC %v", node.Service, action, node.ID(), node.RPC)
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
switch action {
@@ -35,12 +47,13 @@ func (s *islbServer) watchAllNodes(action string, node discovery.Node) {
}
}

// FindNode find service nodes by service name|nid|sid, such as sfu|avp|sip-gateway|rtmp-gateway
func (s *islbServer) FindNode(ctx context.Context, req *proto.FindNodeRequest) (*proto.FindNodeReply, error) {
nid := req.GetNid()
sid := req.GetSid()
service := req.GetService()

log.Infof("nid => %v, sid => %v, service => %v", nid, sid, service)
log.Infof("islb.FindNode: nid => %v, sid => %v, service => %v", nid, sid, service)

nodes := []*ion.Node{}

@@ -55,6 +68,7 @@ func (s *islbServer) FindNode(ctx context.Context, req *proto.FindNodeRequest) (
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
// find node by nid or service
//TODO: Add load balancing algorithm to select SFU nodes
for _, node := range s.nodes {
if nid == node.NID || service == node.Service {
nodes = append(nodes, &ion.Node{
@@ -98,35 +112,23 @@ func (s *islbServer) HandleSessionState(ctx context.Context, state *ion.SessionR
}
*/

func (s *islbServer) PostEvent(context.Context, *proto.ISLBEvent) (*ion.Empty, error) {
//PostISLBEvent Receive ISLBEvent(stream or session events) from ion-SFU, ion-AVP and ion-SIP
//the stream and session event will be save to redis db, which is used to create the
//global location of the media stream
// key = dc/ion-sfu-1/room1/uid
// value = [...stream/track info ...]
func (s *islbServer) PostISLBEvent(context.Context, *proto.ISLBEvent) (*ion.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method PostEvent not implemented")
}

func (s *islbServer) HandleEvent(*ion.Empty, proto.ISLB_HandleEventServer) error {
//WatchISLBEvent broadcast ISLBEvent to ion-biz node.
//The stream metadata is forwarded to biz node and coupled with the peer in the client through UID
func (s *islbServer) WatchISLBEvent(*ion.Empty, proto.ISLB_WatchISLBEventServer) error {
return status.Errorf(codes.Unimplemented, "method HandleEvent not implemented")
}

/*
func (s *islbServer) handle(msg interface{}) (interface{}, error) {
log.Infof("handleRequest: %T, %+v", msg, msg)
switch v := msg.(type) {
case *proto.ToIslbFindNodeMsg:
return s.findNode(v)
case *proto.ToIslbPeerJoinMsg:
return s.peerJoin(v)
case *proto.IslbPeerLeaveMsg:
return s.peerLeave(v)
case *proto.ToIslbStreamAddMsg:
return s.streamAdd(v)
case *proto.IslbBroadcastMsg:
return s.broadcast(v)
default:
return nil, errors.New("unkonw message")
}
}
// Find service nodes by name, such as sfu|avp|sip-gateway|rtmp-gateway
func (s *islbServer) findNode(msg *proto.ToIslbFindNodeMsg) (interface{}, error) {
service := msg.Service
nodes := s.getNodes()
@@ -230,105 +232,4 @@ func (s *islbServer) streamAdd(msg *proto.ToIslbStreamAddMsg) (interface{}, erro
return nil, err
}
func (s *islbServer) peerJoin(msg *proto.ToIslbPeerJoinMsg) (interface{}, error) {
ukey := proto.UserInfo{
DC: s.dc,
SID: msg.SID,
UID: msg.UID,
}.BuildKey()
log.Infof("clientJoin: set %s => %v", ukey, string(msg.Info))
// Tell everyone about the new peer.
if err := s.nrpc.Publish(s.bid, proto.ToClientPeerJoinMsg{
UID: msg.UID, SID: msg.SID, Info: msg.Info,
}); err != nil {
log.Errorf("broadcast peer-join error: %v", err)
return nil, err
}
// Tell the new peer about everyone currently in the room.
searchKey := proto.UserInfo{
DC: s.dc,
SID: msg.SID,
}.BuildKey()
keys := s.redis.Keys(searchKey)
peers := make([]proto.Peer, 0)
streams := make([]proto.Stream, 0)
for _, key := range keys {
fields := s.redis.HGetAll(key)
parsedUserKey, err := proto.ParseUserInfo(key)
if err != nil {
log.Errorf("redis.HGetAll err = %v", err)
continue
}
if info, ok := fields["info"]; ok {
peers = append(peers, proto.Peer{
UID: parsedUserKey.UID,
Info: json.RawMessage(info),
})
} else {
log.Warnf("No info found for %v", key)
}
mkey := proto.MediaInfo{
DC: s.dc,
SID: msg.SID,
UID: parsedUserKey.UID,
}.BuildKey()
mediaKeys := s.redis.Keys(mkey)
for _, mediaKey := range mediaKeys {
mediaFields := s.redis.HGetAll(mediaKey)
for mediaField := range mediaFields {
log.Warnf("Received media field %s for key %s", mediaField, mediaKey)
if len(mediaField) > 6 && mediaField[:6] == "track/" {
streams = append(streams, proto.Stream{
UID: parsedUserKey.UID,
StreamID: proto.StreamID(mediaField[6:]),
})
}
}
}
}
// Write the user info to redis.
err := s.redis.HSetTTL(ukey, "info", string(msg.Info), redisLongKeyTTL)
if err != nil {
log.Errorf("redis.HSetTTL err = %v", err)
}
return proto.FromIslbPeerJoinMsg{
Peers: peers,
Streams: streams,
}, nil
}
func (s *islbServer) peerLeave(msg *proto.IslbPeerLeaveMsg) (interface{}, error) {
ukey := proto.UserInfo{
DC: s.dc,
SID: msg.SID,
UID: msg.UID,
}.BuildKey()
log.Infof("clientLeave: remove key => %s", ukey)
err := s.redis.Del(ukey)
if err != nil {
log.Errorf("redis.Del err = %v", err)
}
if err := s.nrpc.Publish(s.bid, msg); err != nil {
log.Errorf("broadcast peer-leave error: %v", err)
return nil, err
}
return nil, nil
}
func (s *islbServer) broadcast(msg *proto.IslbBroadcastMsg) (interface{}, error) {
if err := s.nrpc.Publish(s.bid, msg); err != nil {
log.Errorf("broadcast message error: %v", err)
}
return nil, nil
}
*/
80 changes: 53 additions & 27 deletions pkg/node/sfu/server.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,59 @@
package sfu

import (
"context"
"encoding/json"
"fmt"
"io"
"sync"

"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
nrpc "github.com/cloudwebrtc/nats-grpc/pkg/rpc"
"github.com/nats-io/nats.go"
log "github.com/pion/ion-log"
sfu "github.com/pion/ion-sfu/pkg/sfu"
isfu "github.com/pion/ion-sfu/pkg/sfu"
"github.com/pion/ion/pkg/grpc/ion"
"github.com/pion/ion/pkg/grpc/islb"
pb "github.com/pion/ion/pkg/grpc/sfu"
"github.com/pion/ion/pkg/proto"
"github.com/pion/ion/pkg/util"
"github.com/pion/webrtc/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type sfuServer struct {
pb.UnimplementedSFUServer
sfu *sfu.SFU
nodeLock sync.RWMutex
nodes map[string]*discovery.Node
nc *nats.Conn
sfu *isfu.SFU
islbcli islb.ISLBClient
sn *SFU
}

func newSFUServer(sfu *sfu.SFU) *sfuServer {
return &sfuServer{sfu: sfu, nodes: make(map[string]*discovery.Node)}
func newSFUServer(sn *SFU, sfu *isfu.SFU, nc *nats.Conn) *sfuServer {
return &sfuServer{sn: sn, sfu: sfu, nc: nc}
}

// watchIslbNodes watch islb nodes up/down
func (s *sfuServer) watchIslbNodes(state discovery.NodeState, node *discovery.Node) {
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
id := node.NID
if state == discovery.NodeUp {
log.Infof("islb node %v up", id)
if _, found := s.nodes[id]; !found {
s.nodes[id] = node
func (s *sfuServer) postISLBEvent(event *islb.ISLBEvent) {
if s.islbcli == nil {
nodes := s.sn.GetNeighborNodes()
for _, node := range nodes {
if node.Service == proto.ServiceISLB {
ncli := nrpc.NewClient(s.nc, node.NID)
s.islbcli = islb.NewISLBClient(ncli)
break
}
}
}

if s.islbcli != nil {
_, err := s.islbcli.PostISLBEvent(context.Background(), event)
if err != nil {
log.Errorf("PostISLBEvent err %v", err)
}
} else if state == discovery.NodeDown {
log.Infof("islb node %v down", id)
delete(s.nodes, id)
}
}

func (s *sfuServer) Signal(stream pb.SFU_SignalServer) error {
peer := sfu.NewPeer(s.sfu)
peer := isfu.NewPeer(s.sfu)
for {
in, err := stream.Recv()

@@ -141,9 +151,9 @@ func (s *sfuServer) Signal(stream pb.SFU_SignalServer) error {
err = peer.Join(payload.Join.Sid, payload.Join.Uid)
if err != nil {
switch err {
case sfu.ErrTransportExists:
case isfu.ErrTransportExists:
fallthrough
case sfu.ErrOfferIgnored:
case isfu.ErrOfferIgnored:
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Error{
Error: fmt.Errorf("join error: %w", err).Error(),
@@ -198,13 +208,29 @@ func (s *sfuServer) Signal(stream pb.SFU_SignalServer) error {
}
}

streams, err := util.ParseSDP(sdp.SDP)
if err != nil {
log.Errorf("util.ParseSDP error: %v", err)
}

s.postISLBEvent(&islb.ISLBEvent{
Payload: &islb.ISLBEvent_Stream{
Stream: &ion.StreamEvent{
Sid: peer.Session().ID(),
Uid: peer.ID(),
Streams: streams,
State: ion.StreamEvent_NEW,
},
},
})

if sdp.Type == webrtc.SDPTypeOffer {
answer, err := peer.Answer(sdp)
if err != nil {
switch err {
case sfu.ErrNoTransportEstablished:
case isfu.ErrNoTransportEstablished:
fallthrough
case sfu.ErrOfferIgnored:
case isfu.ErrOfferIgnored:
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Error{
Error: fmt.Errorf("negotiate answer error: %w", err).Error(),
@@ -247,7 +273,7 @@ func (s *sfuServer) Signal(stream pb.SFU_SignalServer) error {
err := peer.SetRemoteDescription(sdp)
if err != nil {
switch err {
case sfu.ErrNoTransportEstablished:
case isfu.ErrNoTransportEstablished:
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Error{
Error: fmt.Errorf("set remote description error: %w", err).Error(),
@@ -283,7 +309,7 @@ func (s *sfuServer) Signal(stream pb.SFU_SignalServer) error {
err = peer.Trickle(candidate, int(payload.Trickle.Target))
if err != nil {
switch err {
case sfu.ErrNoTransportEstablished:
case isfu.ErrNoTransportEstablished:
log.Errorf("peer hasn't joined, error -> %v", err)
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Error{
6 changes: 3 additions & 3 deletions pkg/node/sfu/sfu.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ type SFU struct {
// NewSFU create a sfu node instance
func NewSFU(nid string) *SFU {
s := &SFU{
Node: ion.Node{NID: nid},
Node: ion.NewNode(nid),
}
return s
}
@@ -74,12 +74,12 @@ func (s *SFU) Start(conf Config) error {

go s.Node.KeepAlive(node)

s.s = newSFUServer(isfu.NewSFU(conf.Config))
s.s = newSFUServer(s, isfu.NewSFU(conf.Config), s.NatsConn())
//grpc service
psfu.RegisterSFUServer(s.Node.ServiceRegistrar(), s.s)

//Watch ISLB nodes.
go s.Node.Watch(proto.ServiceISLB, s.s.watchIslbNodes)
go s.Node.Watch(proto.ServiceISLB)
return nil
}

80 changes: 80 additions & 0 deletions pkg/util/sdp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package util

import (
"strings"

log "github.com/pion/ion-log"
"github.com/pion/ion/pkg/grpc/ion"
"github.com/pixelbender/go-sdp/sdp"
)

//ParseSDP .
func ParseSDP(sdpstr string) ([]*ion.Stream, error) {
sess, err := sdp.ParseString(sdpstr)

if err != nil {
log.Errorf("sdp.Parse erro %v", err)
return nil, err
}

streams := make(map[string]*ion.Stream)
for _, m := range sess.Media {
//fmt.Printf("type = %v\n", m.Type)

if m.Type == "audio" || m.Type == "video" {
msid := m.Attributes.Get("msid")
//fmt.Printf("msid id = %v\n", msid)

strs := strings.Split(msid, " ")
streamID := strs[0]
trackID := msid
trackLabel := strs[1]

track := &ion.Track{
Kind: m.Type,
Id: trackID,
Label: trackLabel,
}

simulcast := make(map[string]string)
for _, attr := range m.Attributes {
//fmt.Printf("attr name = %v, value = %v\n", attr.Name, attr.Value)

if attr.Name == "rid" {
strs := strings.Split(attr.Value, " ")
rid := strs[0]
dir := strs[1]
//fmt.Printf("rid: rid = %v, dir = %v\n", rid, dir)
simulcast[rid] = dir
}
/*
if attr.Name == "simulcast" {
strs := strings.Split(attr.Value, " ")
dir := strs[0]
rids := strs[1]
fmt.Printf("simulcast: rids = %v, dir = %v\n", rids, dir)
}
*/
}
track.Simulcast = simulcast

if stream, ok := streams[streamID]; ok {
stream.Tracks = append(stream.Tracks, track)
} else {
stream = &ion.Stream{
Id: streamID,
}
stream.Tracks = append(stream.Tracks, track)
streams[streamID] = stream
}
}
}

var list []*ion.Stream
for _, stream := range streams {
//fmt.Printf("%v\n", stream.String())
list = append(list, stream)
}

return list, nil
}
2 changes: 1 addition & 1 deletion protos/avp.proto
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ message Element {
// webmsaver/mixer/opencv/play/record/ml
string name = 1;
// params for element.
repeated ion.Parameter parameters = 2;
map<string, string> params = 2;
}

message AVPRequest {
6 changes: 2 additions & 4 deletions protos/biz.proto
Original file line number Diff line number Diff line change
@@ -11,10 +11,8 @@ service Biz {
}

message Join {
string sid = 1;
string uid = 2;
bytes info = 3;
string token = 4;
ion.Peer peer = 1;
string token = 2;
}

message JoinReply {
60 changes: 28 additions & 32 deletions protos/ion.proto
Original file line number Diff line number Diff line change
@@ -8,10 +8,9 @@ message Empty {}

message Track {
string id = 1;
string kind = 2;
string rid = 3;
uint32 ssrc = 4;
string codec = 5;
string label = 2;
string kind = 3;
map<string, string> simulcast = 4;
}

message Stream {
@@ -20,14 +19,9 @@ message Stream {
}

message Peer {
string uid = 1;
repeated Stream streams = 2;
}

message Session {
Node node = 1;
string sid = 3;
repeated Peer peers = 4;
string sid = 1;
string uid = 2;
bytes info = 3;
}

// Describe the basic media info in the session of sfu.
@@ -39,34 +33,36 @@ message Session {
// ${msid}: [{id: ${trackId}, kind:audio}, {id: ${trackId}, kind:video}]
// ]

message SessionReport {
enum State {
NEW = 0;
UPDATE = 1;
DELETE = 2;
}
State state = 3;
ion.Session session = 4;
}

message PeerEvent {
message SessionEvent {
enum State {
JOIN = 0;
LEAVE = 1;
NEW = 0;
DELETE = 1;
}
State state = 2;
ion.Peer peer = 3;
string nid = 3;
string sid = 4;
}

message StreamEvent {
enum State {
ADD = 0;
NEW = 0;
DELETE = 2;
}
State state = 3;
string nid = 4;
string sid = 5;
string uid = 6;
repeated ion.Stream streams = 7;
}

message PeerEvent {
enum State {
JOIN = 0;
UPDATE = 1;
REMOVE = 2;
LEAVE = 2;
}
string uid = 3;
State state = 4;
repeated ion.Stream streams = 5;
State state = 3;
ion.Peer peer = 4;
}

message Message {
@@ -83,7 +79,7 @@ message Parameter {
message RPC {
string protocol = 1;
string addr = 2;
repeated Parameter params = 3;
map<string, string> params = 3;
}

message Node {
13 changes: 8 additions & 5 deletions protos/islb.proto
Original file line number Diff line number Diff line change
@@ -9,9 +9,9 @@ package islb;
service ISLB {
rpc FindNode(FindNodeRequest) returns (FindNodeReply) {}

rpc PostEvent(ISLBEvent) returns (ion.Empty){}
rpc PostISLBEvent(ISLBEvent) returns (ion.Empty) {}

rpc HandleEvent(ion.Empty) returns (stream ISLBEvent) {}
rpc WatchISLBEvent(ion.Empty) returns (stream ISLBEvent) {}
}

message FindNodeRequest {
@@ -24,10 +24,13 @@ message FindNodeReply {
repeated ion.Node nodes = 1;
}

message WatchRequest {

}

message ISLBEvent {
oneof payload {
ion.PeerEvent peerEvent = 2;
ion.StreamEvent streamEvent = 3;
ion.Message msg = 4;
ion.SessionEvent session = 1;
ion.StreamEvent stream = 2;
}
}

0 comments on commit b36eb9a

Please sign in to comment.