Skip to content
This repository has been archived by the owner on Dec 2, 2023. It is now read-only.

Commit

Permalink
update.
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwebrtc committed Mar 7, 2021
1 parent 248e65f commit 8fbe0df
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cmd/biz/grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func main() {

s := server.NewWrapperedGRPCWebServer(options)

node := biz.NewBIZ("biz1")
node := biz.NewBIZ("biz01")
if err := node.Start(conf); err != nil {
log.Errorf("biz init start: %v", err)
os.Exit(-1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/islb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {

log.Infof("--- starting islb node ---")

node := islb.NewISLB("islb")
node := islb.NewISLB("islb00")
if err := node.Start(conf); err != nil {
log.Errorf("islb start error: %v", err)
os.Exit(-1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/sfu/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {

log.Infof("--- starting sfu node ---")

node := sfu.NewSFU("sfu")
node := sfu.NewSFU("sfu01")
if err := node.Start(conf); err != nil {
log.Errorf("sfu init start: %v", err)
os.Exit(-1)
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ module github.com/pion/ion
go 1.13

require (
github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6
github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2
github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4 // indirect
github.com/google/uuid v1.2.0
github.com/gorilla/websocket v1.4.2
github.com/improbable-eng/grpc-web v0.13.0
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
github.com/notedit/sdp v0.0.4
github.com/pion/ion-avp v1.8.1
github.com/pion/ion-log v1.0.0
github.com/pion/ion-sfu v1.9.3
Expand All @@ -27,5 +26,4 @@ require (
google.golang.org/grpc v1.35.0
google.golang.org/grpc/examples v0.0.0-20210205041354-b753f4903c1b // indirect
google.golang.org/protobuf v1.25.0
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919
)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Jeffail/gabs v1.1.1 h1:V0uzR08Hj22EX8+8QMhyI9sX2hwRu+/RJhJUmnwda/E=
github.com/Jeffail/gabs v1.1.1/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -55,8 +53,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6 h1:bowKs/bZFeF/RJIZpGYf4eyylnl3K8fiXCrGLJi0Zo0=
github.com/cloudwebrtc/nats-discovery v0.0.0-20210206133643-dda3fdf282f6/go.mod h1:d9ie3jvSVKQU/JHOojIml74wM0uO4ZoZK/XNQryGz5s=
github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2 h1:KD+9+9kFAL4wP0LkJILvtA6Kk/b5qUIX5UieAB6vq6A=
github.com/cloudwebrtc/nats-discovery v0.0.0-20210307133055-af64b19e13c2/go.mod h1:d9ie3jvSVKQU/JHOojIml74wM0uO4ZoZK/XNQryGz5s=
github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff h1:ut0VfTIpLCX+9Fd1SfhL05ncrX4vryDjpMOCRQz2MAU=
github.com/cloudwebrtc/nats-grpc v0.1.3-0.20210206060332-3cf62da1f1ff/go.mod h1:bVnBCB2hk1b00GGR6xMZ7cI61dsCXzVpSFWkv5wLEys=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -272,8 +270,6 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/notedit/sdp v0.0.4 h1:P4L8HbZ8SfzrRDE2m3zPnkHhcSdr/0sZkapKo0lyDJs=
github.com/notedit/sdp v0.0.4/go.mod h1:v7SdJxYpW6sY8RhA2KX14mmIHXKC0Kl/XrEQwaQJ7lM=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
Expand Down Expand Up @@ -708,8 +704,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919 h1:tmXTu+dfa+d9Evp8NpJdgOy6+rt8/x4yG7qPBrtNfLY=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
6 changes: 4 additions & 2 deletions pkg/node/biz/biz.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ func (b *BIZ) Start(conf Config) error {

go b.Node.KeepAlive(node)

b.s = newBizServer(conf.Global.Dc, b.NID, conf.Avp.Elements)
s := newBizServer(conf.Global.Dc, b.NID, conf.Avp.Elements, b.NatsConn())

//Watch ISLB nodes.
go b.Node.Watch(proto.ServiceISLB, b.s.watchIslbNodes)
go b.Node.Watch(proto.ServiceISLB, s.watchNodes)

b.s = s

return nil
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/node/biz/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package biz
import (
"sync"

"github.com/pion/ion/pkg/grpc/biz"
"github.com/pion/ion/pkg/util"
)

Expand Down Expand Up @@ -49,12 +50,6 @@ func (p *Peer) SID() string {
return p.sid
}

/*
func (p *Peer) sfu() (string, error) {
return p.s.getNode(proto.ServiceSFU, p.uid, p.sid, p.mid)
func (p *Peer) handleBizSignal(req *biz.JoinRequest) (*biz.JoinReply, error) {
return nil, nil
}
func (p *Peer) avp() (string, error) {
return p.s.getNode(proto.ServiceAVP, p.uid, p.sid, p.mid)
}
*/
110 changes: 102 additions & 8 deletions pkg/node/biz/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"time"

"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
nrpc "github.com/cloudwebrtc/nats-grpc/pkg/rpc"
"github.com/nats-io/nats.go"
log "github.com/pion/ion-log"
biz "github.com/pion/ion/pkg/grpc/biz"
islb "github.com/pion/ion/pkg/grpc/islb"
sfu "github.com/pion/ion/pkg/grpc/sfu"
"github.com/pion/ion/pkg/proto"
)

const (
Expand All @@ -21,6 +24,7 @@ const (
type BizServer struct {
biz.UnimplementedBizServer
sfu.UnimplementedSFUServer
nc *nats.Conn
elements []string
roomLock sync.RWMutex
rooms map[string]*room
Expand All @@ -31,8 +35,9 @@ type BizServer struct {
}

// newBizServer creates a new avp server instance
func newBizServer(dc string, nid string, elements []string) *BizServer {
func newBizServer(dc string, nid string, elements []string, nc *nats.Conn) *BizServer {
return &BizServer{
nc: nc,
elements: elements,
rooms: make(map[string]*room),
closed: make(chan bool),
Expand All @@ -59,42 +64,131 @@ func (s *BizServer) getRoom(id string) *room {

//Signal forward to sfu node.
func (s *BizServer) Signal(stream sfu.SFU_SignalServer) error {
var peer *Peer = nil
for {
payload, err := stream.Recv()
req, err := stream.Recv()
if err != nil {
log.Errorf("err: %v", err)
return err
}

log.Infof("req => %v", payload.String())
switch payload := req.Payload.(type) {
case *sfu.SignalRequest_Join:
sid := payload.Join.Sid
uid := payload.Join.Uid
room := s.getRoom(sid)
if room != nil {
peer = room.getPeer(uid)
if peer != nil {

}
}
}

log.Infof("req => %v", req.String())
}
}

//Join for biz request.
func (s *BizServer) Join(stream biz.Biz_JoinServer) error {
var r *room = nil
var peer *Peer = nil
defer func() {
if peer != nil && r != nil {
r.delPeer(peer.UID())
}
}()

for {
payload, err := stream.Recv()
req, err := stream.Recv()
if err != nil {
log.Errorf("err: %v", err)
return err
}

log.Infof("req => %v", payload.String())
switch payload := req.Payload.(type) {
case *biz.JoinRequest_Join:
sid := payload.Join.Sid
uid := payload.Join.Uid
info := payload.Join.Info
r = s.getRoom(sid)

if r == nil {
r = newRoom(sid)
s.roomLock.RLock()
s.rooms[sid] = r
s.roomLock.RUnlock()
}

if s.islbcli == nil {
ncli := nrpc.NewClient(s.nc, s.nodes["islb"].NID)
s.islbcli = islb.NewISLBClient(ncli)
}

resp, err := s.islbcli.FindNode(context.TODO(), &islb.FindNodeRequest{
Service: proto.ServiceSFU,
Sid: sid,
})
log.Infof("resp => %v", resp)

if err != nil {
stream.Send(&biz.JoinReply{
Payload: &biz.JoinReply_Result{
Result: &biz.JoinResult{
Success: false,
Reason: "islb " + sid + " not found.",
},
},
})
break
}

if r != nil {
peer = NewPeer(sid, uid, info)
r.addPeer(peer)
stream.Send(&biz.JoinReply{
Payload: &biz.JoinReply_Result{
Result: &biz.JoinResult{
Success: true,
Reason: "join success.",
},
},
})
} else {
stream.Send(&biz.JoinReply{
Payload: &biz.JoinReply_Result{
Result: &biz.JoinResult{
Success: false,
Reason: "sid " + sid + " not found.",
},
},
})
}
case *biz.JoinRequest_Leave:
//uid := payload.Leave.Uid
break

case *biz.JoinRequest_Msg:
//msg := payload.Msg
//broadcast massge to room.
}
log.Infof("req => %v", req.String())
}
}

// watchNodes watch islb nodes up/down
func (s *BizServer) watchIslbNodes(state discovery.NodeState, node *discovery.Node) {
func (s *BizServer) watchNodes(state discovery.NodeState, node *discovery.Node) {
s.nodeLock.Lock()
defer s.nodeLock.Unlock()
id := node.NID
service := node.Service
if state == discovery.NodeUp {
log.Infof("islb node %v up", id)
log.Infof("Service up: "+service+" node id => [%v], rpc => %v", id, node.RPC.Protocol)
if _, found := s.nodes[id]; !found {
s.nodes[id] = node
}
} else if state == discovery.NodeDown {
log.Infof("islb node %v down", id)
log.Infof("Service down: "+service+" node id => [%v]", id)
delete(s.nodes, id)
}
}
Expand Down
35 changes: 21 additions & 14 deletions pkg/node/islb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ type islbServer struct {
// handle Node from service discovery.
func (s *islbServer) handleNode(action string, node discovery.Node) {
log.Infof("handleNode:service %v, action %v => id %v, RPC %v", node.Service, action, node.ID(), node.RPC)

s.nodeLock.Lock()
defer s.nodeLock.Unlock()

switch action {
case discovery.Save:
fallthrough
Expand All @@ -38,24 +36,33 @@ func (s *islbServer) handleNode(action string, node discovery.Node) {
}

func (s *islbServer) FindNode(ctx context.Context, req *proto.FindNodeRequest) (*proto.FindNodeReply, error) {
log.Infof("nid => %v", req.GetNid())
nodes := []*ion.Node{
{
Nid: "avp-01",
Service: "avp",
},
{
Nid: "sfu-01",
Service: "sfu",
},
}
nid := req.GetNid()
sid := req.GetSid()
service := req.GetService()

log.Infof("nid => %v, sid => %v, service => %v", nid, sid, service)

nodes := []*ion.Node{}

mkey := "*" + "." + req.GetNid() + "." + req.GetSid() + ".*"
// find node by sid
mkey := "*" + ".*." + sid + ".*"
for _, key := range s.Redis.Keys(mkey) {
fields := s.Redis.HGetAll(key)
log.Debugf("key: %v, fields: %v", key, fields)
}

if len(nodes) == 0 {
// find node by nid or service
for _, node := range s.nodes {
if nid == node.NID || service == node.Service {
nodes = append(nodes, &ion.Node{
Nid: node.NID,
Service: node.Service,
})
}
}
}

return &proto.FindNodeReply{
Nodes: nodes,
}, nil
Expand Down
Loading

0 comments on commit 8fbe0df

Please sign in to comment.