diff --git a/cmd/biz/grpc/main.go b/cmd/biz/grpc/main.go index 525a8dca1..e78430768 100644 --- a/cmd/biz/grpc/main.go +++ b/cmd/biz/grpc/main.go @@ -102,7 +102,7 @@ func main() { s := server.NewWrapperedGRPCWebServer(options) - node := biz.NewBIZ("biz1") + node := biz.NewBIZ("biz01") if err := node.Start(conf); err != nil { log.Errorf("biz init start: %v", err) os.Exit(-1) diff --git a/cmd/islb/main.go b/cmd/islb/main.go index d712a6438..0581a9719 100644 --- a/cmd/islb/main.go +++ b/cmd/islb/main.go @@ -74,7 +74,7 @@ func main() { log.Infof("--- starting islb node ---") - node := islb.NewISLB("islb") + node := islb.NewISLB("islb00") if err := node.Start(conf); err != nil { log.Errorf("islb start error: %v", err) os.Exit(-1) diff --git a/cmd/sfu/main.go b/cmd/sfu/main.go index 10659bdcc..292a3ce30 100644 --- a/cmd/sfu/main.go +++ b/cmd/sfu/main.go @@ -96,7 +96,7 @@ func main() { log.Infof("--- starting sfu node ---") - node := sfu.NewSFU("sfu") + node := sfu.NewSFU("sfu01") if err := node.Start(conf); err != nil { log.Errorf("sfu init start: %v", err) os.Exit(-1) diff --git a/go.mod b/go.mod index 41c6fdc0b..03542a7ff 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,16 @@ module github.com/pion/ion go 1.13 require ( - github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6 + github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2 github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-redis/redis/v7 v7.4.0 github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.4 // indirect - github.com/google/uuid v1.2.0 github.com/gorilla/websocket v1.4.2 github.com/improbable-eng/grpc-web v0.13.0 + github.com/nats-io/nats-server/v2 v2.1.9 github.com/nats-io/nats.go v1.10.0 - github.com/notedit/sdp v0.0.4 github.com/pion/ion-avp v1.8.1 github.com/pion/ion-log v1.0.0 github.com/pion/ion-sfu v1.9.3 @@ -27,5 +26,4 @@ require ( google.golang.org/grpc v1.35.0 google.golang.org/grpc/examples v0.0.0-20210205041354-b753f4903c1b // indirect google.golang.org/protobuf v1.25.0 - grpc.go4.org v0.0.0-20170609214715-11d0a25b4919 ) diff --git a/go.sum b/go.sum index 9123345a6..4a1219840 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/Jeffail/gabs v1.1.1 h1:V0uzR08Hj22EX8+8QMhyI9sX2hwRu+/RJhJUmnwda/E= -github.com/Jeffail/gabs v1.1.1/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -55,8 +53,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6 h1:bowKs/bZFeF/RJIZpGYf4eyylnl3K8fiXCrGLJi0Zo0= -github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6/go.mod h1:d9ie3jvSVKQU/JHOojIml74wM0uO4ZoZK/XNQryGz5s= +github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2 h1:KD+9+9kFAL4wP0LkJILvtA6Kk/b5qUIX5UieAB6vq6A= +github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2/go.mod h1:d9ie3jvSVKQU/JHOojIml74wM0uO4ZoZK/XNQryGz5s= github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff h1:ut0VfTIpLCX+9Fd1SfhL05ncrX4vryDjpMOCRQz2MAU= github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff/go.mod h1:bVnBCB2hk1b00GGR6xMZ7cI61dsCXzVpSFWkv5wLEys= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -272,8 +270,6 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/notedit/sdp v0.0.4 h1:P4L8HbZ8SfzrRDE2m3zPnkHhcSdr/0sZkapKo0lyDJs= -github.com/notedit/sdp v0.0.4/go.mod h1:v7SdJxYpW6sY8RhA2KX14mmIHXKC0Kl/XrEQwaQJ7lM= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= @@ -708,8 +704,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo= gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -grpc.go4.org v0.0.0-20170609214715-11d0a25b4919 h1:tmXTu+dfa+d9Evp8NpJdgOy6+rt8/x4yG7qPBrtNfLY= -grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/node/biz/biz.go b/pkg/node/biz/biz.go index 049a39b30..e066bc21c 100644 --- a/pkg/node/biz/biz.go +++ b/pkg/node/biz/biz.go @@ -95,10 +95,12 @@ func (b *BIZ) Start(conf Config) error { go b.Node.KeepAlive(node) - b.s = newBizServer(conf.Global.Dc, b.NID, conf.Avp.Elements) + s := newBizServer(conf.Global.Dc, b.NID, conf.Avp.Elements, b.NatsConn()) //Watch ISLB nodes. - go b.Node.Watch(proto.ServiceISLB, b.s.watchIslbNodes) + go b.Node.Watch(proto.ServiceISLB, s.watchNodes) + + b.s = s return nil } diff --git a/pkg/node/biz/peer.go b/pkg/node/biz/peer.go index a21ecc6ef..4b40dbe6e 100644 --- a/pkg/node/biz/peer.go +++ b/pkg/node/biz/peer.go @@ -3,6 +3,7 @@ package biz import ( "sync" + "github.com/pion/ion/pkg/grpc/biz" "github.com/pion/ion/pkg/util" ) @@ -49,12 +50,6 @@ func (p *Peer) SID() string { return p.sid } -/* -func (p *Peer) sfu() (string, error) { - return p.s.getNode(proto.ServiceSFU, p.uid, p.sid, p.mid) +func (p *Peer) handleBizSignal(req *biz.JoinRequest) (*biz.JoinReply, error) { + return nil, nil } - -func (p *Peer) avp() (string, error) { - return p.s.getNode(proto.ServiceAVP, p.uid, p.sid, p.mid) -} -*/ diff --git a/pkg/node/biz/server.go b/pkg/node/biz/server.go index cd0a2ec6d..12a71ecb6 100644 --- a/pkg/node/biz/server.go +++ b/pkg/node/biz/server.go @@ -7,10 +7,13 @@ import ( "time" "github.com/cloudwebrtc/nats-discovery/pkg/discovery" + nrpc "github.com/cloudwebrtc/nats-grpc/pkg/rpc" + "github.com/nats-io/nats.go" log "github.com/pion/ion-log" biz "github.com/pion/ion/pkg/grpc/biz" islb "github.com/pion/ion/pkg/grpc/islb" sfu "github.com/pion/ion/pkg/grpc/sfu" + "github.com/pion/ion/pkg/proto" ) const ( @@ -21,6 +24,7 @@ const ( type BizServer struct { biz.UnimplementedBizServer sfu.UnimplementedSFUServer + nc *nats.Conn elements []string roomLock sync.RWMutex rooms map[string]*room @@ -31,8 +35,9 @@ type BizServer struct { } // newBizServer creates a new avp server instance -func newBizServer(dc string, nid string, elements []string) *BizServer { +func newBizServer(dc string, nid string, elements []string, nc *nats.Conn) *BizServer { return &BizServer{ + nc: nc, elements: elements, rooms: make(map[string]*room), closed: make(chan bool), @@ -59,42 +64,131 @@ func (s *BizServer) getRoom(id string) *room { //Signal forward to sfu node. func (s *BizServer) Signal(stream sfu.SFU_SignalServer) error { + var peer *Peer = nil for { - payload, err := stream.Recv() + req, err := stream.Recv() if err != nil { log.Errorf("err: %v", err) return err } - log.Infof("req => %v", payload.String()) + switch payload := req.Payload.(type) { + case *sfu.SignalRequest_Join: + sid := payload.Join.Sid + uid := payload.Join.Uid + room := s.getRoom(sid) + if room != nil { + peer = room.getPeer(uid) + if peer != nil { + + } + } + } + + log.Infof("req => %v", req.String()) } } //Join for biz request. func (s *BizServer) Join(stream biz.Biz_JoinServer) error { + var r *room = nil + var peer *Peer = nil + defer func() { + if peer != nil && r != nil { + r.delPeer(peer.UID()) + } + }() + for { - payload, err := stream.Recv() + req, err := stream.Recv() if err != nil { log.Errorf("err: %v", err) return err } - log.Infof("req => %v", payload.String()) + switch payload := req.Payload.(type) { + case *biz.JoinRequest_Join: + sid := payload.Join.Sid + uid := payload.Join.Uid + info := payload.Join.Info + r = s.getRoom(sid) + + if r == nil { + r = newRoom(sid) + s.roomLock.RLock() + s.rooms[sid] = r + s.roomLock.RUnlock() + } + + if s.islbcli == nil { + ncli := nrpc.NewClient(s.nc, s.nodes["islb"].NID) + s.islbcli = islb.NewISLBClient(ncli) + } + + resp, err := s.islbcli.FindNode(context.TODO(), &islb.FindNodeRequest{ + Service: proto.ServiceSFU, + Sid: sid, + }) + log.Infof("resp => %v", resp) + + if err != nil { + stream.Send(&biz.JoinReply{ + Payload: &biz.JoinReply_Result{ + Result: &biz.JoinResult{ + Success: false, + Reason: "islb " + sid + " not found.", + }, + }, + }) + break + } + + if r != nil { + peer = NewPeer(sid, uid, info) + r.addPeer(peer) + stream.Send(&biz.JoinReply{ + Payload: &biz.JoinReply_Result{ + Result: &biz.JoinResult{ + Success: true, + Reason: "join success.", + }, + }, + }) + } else { + stream.Send(&biz.JoinReply{ + Payload: &biz.JoinReply_Result{ + Result: &biz.JoinResult{ + Success: false, + Reason: "sid " + sid + " not found.", + }, + }, + }) + } + case *biz.JoinRequest_Leave: + //uid := payload.Leave.Uid + break + + case *biz.JoinRequest_Msg: + //msg := payload.Msg + //broadcast massge to room. + } + log.Infof("req => %v", req.String()) } } // watchNodes watch islb nodes up/down -func (s *BizServer) watchIslbNodes(state discovery.NodeState, node *discovery.Node) { +func (s *BizServer) watchNodes(state discovery.NodeState, node *discovery.Node) { s.nodeLock.Lock() defer s.nodeLock.Unlock() id := node.NID + service := node.Service if state == discovery.NodeUp { - log.Infof("islb node %v up", id) + log.Infof("Service up: "+service+" node id => [%v], rpc => %v", id, node.RPC.Protocol) if _, found := s.nodes[id]; !found { s.nodes[id] = node } } else if state == discovery.NodeDown { - log.Infof("islb node %v down", id) + log.Infof("Service down: "+service+" node id => [%v]", id) delete(s.nodes, id) } } diff --git a/pkg/node/islb/server.go b/pkg/node/islb/server.go index 1c5461cfa..88a2635f6 100644 --- a/pkg/node/islb/server.go +++ b/pkg/node/islb/server.go @@ -23,10 +23,8 @@ type islbServer struct { // handle Node from service discovery. func (s *islbServer) handleNode(action string, node discovery.Node) { log.Infof("handleNode:service %v, action %v => id %v, RPC %v", node.Service, action, node.ID(), node.RPC) - s.nodeLock.Lock() defer s.nodeLock.Unlock() - switch action { case discovery.Save: fallthrough @@ -38,24 +36,33 @@ func (s *islbServer) handleNode(action string, node discovery.Node) { } func (s *islbServer) FindNode(ctx context.Context, req *proto.FindNodeRequest) (*proto.FindNodeReply, error) { - log.Infof("nid => %v", req.GetNid()) - nodes := []*ion.Node{ - { - Nid: "avp-01", - Service: "avp", - }, - { - Nid: "sfu-01", - Service: "sfu", - }, - } + nid := req.GetNid() + sid := req.GetSid() + service := req.GetService() + + log.Infof("nid => %v, sid => %v, service => %v", nid, sid, service) + + nodes := []*ion.Node{} - mkey := "*" + "." + req.GetNid() + "." + req.GetSid() + ".*" + // find node by sid + mkey := "*" + ".*." + sid + ".*" for _, key := range s.Redis.Keys(mkey) { fields := s.Redis.HGetAll(key) log.Debugf("key: %v, fields: %v", key, fields) } + if len(nodes) == 0 { + // find node by nid or service + for _, node := range s.nodes { + if nid == node.NID || service == node.Service { + nodes = append(nodes, &ion.Node{ + Nid: node.NID, + Service: node.Service, + }) + } + } + } + return &proto.FindNodeReply{ Nodes: nodes, }, nil diff --git a/pkg/node/sfu/server.go b/pkg/node/sfu/server.go index 18361f8ec..76997d070 100644 --- a/pkg/node/sfu/server.go +++ b/pkg/node/sfu/server.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/cloudwebrtc/nats-discovery/pkg/discovery" log "github.com/pion/ion-log" sfu "github.com/pion/ion-sfu/pkg/sfu" rtc "github.com/pion/ion/pkg/grpc/rtc" @@ -17,11 +18,29 @@ import ( type sfuServer struct { rtc.UnimplementedRTCServer sync.Mutex - SFU *sfu.SFU + SFU *sfu.SFU + nodeLock sync.RWMutex + nodes map[string]*discovery.Node } func newServer(sfu *sfu.SFU) *sfuServer { - return &sfuServer{SFU: sfu} + return &sfuServer{SFU: sfu, nodes: make(map[string]*discovery.Node)} +} + +// watchNodes watch islb nodes up/down +func (s *sfuServer) watchNodes(state discovery.NodeState, node *discovery.Node) { + s.nodeLock.Lock() + defer s.nodeLock.Unlock() + id := node.NID + if state == discovery.NodeUp { + log.Infof("islb node %v up", id) + if _, found := s.nodes[id]; !found { + s.nodes[id] = node + } + } else if state == discovery.NodeDown { + log.Infof("islb node %v down", id) + delete(s.nodes, id) + } } func (s *sfuServer) Signal(stream rtc.RTC_SignalServer) error { @@ -61,7 +80,6 @@ func (s *sfuServer) Signal(stream rtc.RTC_SignalServer) error { case sfu.ErrTransportExists: fallthrough case sfu.ErrOfferIgnored: - s.Lock() err = stream.Send(&rtc.Signalling{ Payload: &rtc.Signalling_Error{ Error: &rtc.Error{ @@ -70,7 +88,6 @@ func (s *sfuServer) Signal(stream rtc.RTC_SignalServer) error { }, }, }) - s.Unlock() if err != nil { log.Errorf("grpc send error %v ", err) return status.Errorf(codes.Internal, err.Error()) diff --git a/pkg/node/sfu/sfu.go b/pkg/node/sfu/sfu.go index c16540a33..6aba0dccb 100644 --- a/pkg/node/sfu/sfu.go +++ b/pkg/node/sfu/sfu.go @@ -77,6 +77,9 @@ func (s *SFU) Start(conf Config) error { s.s = newServer(isfu.NewSFU(conf.Config)) //grpc service pb.RegisterRTCServer(s.Node.ServiceRegistrar(), s.s) + + //Watch ISLB nodes. + go s.Node.Watch(proto.ServiceISLB, s.s.watchNodes) return nil }