Skip to content

Commit

Permalink
refactor lobby request commands
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoSchw committed Mar 11, 2024
1 parent 2e32122 commit 513a76e
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 90 deletions.
26 changes: 10 additions & 16 deletions internal/lobby/lobby.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

"github.com/google/uuid"
"github.com/shigde/sfu/internal/lobby/sessions"
"golang.org/x/exp/slog"
)

type command interface {
GetSessionId() uuid.UUID
Execute(session *sessions.Session)
GetUserId() uuid.UUID
Execute(ctx context.Context, session *sessions.Session)
Fail(err error)
}

var (
Expand All @@ -31,14 +31,11 @@ type lobby struct {
entity *LobbyEntity
sessions *sessions.SessionRepository
hub *sessions.Hub

cmd chan command
}

func newLobby(id uuid.UUID, entity *LobbyEntity) *lobby {
ctx, stop := context.WithCancel(context.Background())
sessRep := sessions.NewSessionRepository()

hub := sessions.NewHub(ctx, sessRep, entity.LiveStreamId, nil)
lobby := &lobby{
Id: id,
Expand All @@ -49,19 +46,16 @@ func newLobby(id uuid.UUID, entity *LobbyEntity) *lobby {
hub: hub,
entity: entity,
}
go lobby.run()
return lobby
}

func (l *lobby) run() {
slog.Info("lobby.lobby: run", "lobbyId", l.Id)
for {
select {
// case cmd := <-l.cmd:
func (l *lobby) newSession(userId uuid.UUID, rtp sessions.RtpEngine) bool {
return l.sessions.New(sessions.NewSession(l.ctx, userId, l.hub, rtp))
}

case <-l.ctx.Done():
slog.Info("lobby.lobby: close lobby", "lobbyId", l.Id)
return
}
func (l *lobby) handle(cmd command) {
if session, found := l.sessions.FindByUserId(cmd.GetUserId()); found {
cmd.Execute(l.ctx, session)
}
cmd.Fail(errNoSession)
}
61 changes: 1 addition & 60 deletions internal/lobby/lobby_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,66 +43,7 @@ func (m *LobbyManager) NewIngressResource(ctx context.Context, lobbyId uuid.UUID
return nil, nil
}

// NewEgressResource(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID, option WebrtcResourceOption) (*WebrtcResource, error)
// DeleteAllResources(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID) (bool, error)
//}

//
// NewIngressEndpoint(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID, offer *webrtc.SessionDescription) (struct {
// Answer *webrtc.SessionDescription
// Resource uuid.UUID
// RtpSessionId uuid.UUID
// }, error)
//
// InitLobbyEgressEndpoint(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID) (struct {
// Offer *webrtc.SessionDescription
// Active bool
// RtpSessionId uuid.UUID
// }, error)
//
// FinalCreateLobbyEgressEndpoint(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID, offer *webrtc.SessionDescription) (struct {
// Answer *webrtc.SessionDescription
// Active bool
// RtpSessionId uuid.UUID
// }, error)
//
// CreateMainStreamLobbyEgressEndpoint(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID, offer *webrtc.SessionDescription) (struct {
// Answer *webrtc.SessionDescription
// RtpSessionId uuid.UUID
// }, error)
//
// LeaveLobby(ctx context.Context, lobbyId uuid.UUID, userId uuid.UUID) (bool, error)
//
// // Live Stream API
// StartLiveStream(
// ctx context.Context,
// lobbyId uuid.UUID,
// key string,
// rtmpUrl string,
// userId uuid.UUID,
// ) error
//
// StopLiveStream(
// ctx context.Context,
// lobbyId uuid.UUID,
// userId uuid.UUID,
// ) error
//
// // Server to Server API
// CreateLobbyHostPipe(ctx context.Context, u uuid.UUID, offer *webrtc.SessionDescription, instanceId uuid.UUID) (struct {
// Answer *webrtc.SessionDescription
// Resource uuid.UUID
// RtpSessionId uuid.UUID
// }, error)
//
// CreateLobbyHostIngress(ctx context.Context, u uuid.UUID, offer *webrtc.SessionDescription, instanceId uuid.UUID) (struct {
// Answer *webrtc.SessionDescription
// Resource uuid.UUID
// RtpSessionId uuid.UUID
// }, error)
//
// CloseLobbyHostPipe(ctx context.Context, u uuid.UUID, id uuid.UUID) (bool, error)
//}
// Old API -----------------------------------

func (m *LobbyManager) CreateLobbyIngressEndpoint(ctx context.Context, lobbyId uuid.UUID, user uuid.UUID, offer *webrtc.SessionDescription) (struct {
Answer *webrtc.SessionDescription
Expand Down
3 changes: 2 additions & 1 deletion internal/lobby/lobby_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/google/uuid"
"github.com/shigde/sfu/internal/lobby/sessions"
"github.com/shigde/sfu/internal/storage"
"github.com/stretchr/testify/assert"
)
Expand All @@ -15,7 +16,7 @@ func testRtpStreamLobbyRepositorySetup(t *testing.T) *lobbyRepository {
t.Helper()
store := storage.NewTestStore()
_ = store.GetDatabase().AutoMigrate(&LobbyEntity{})
var engine rtpEngine
var engine sessions.RtpEngine
host, _ := url.Parse("")
repository := newLobbyRepository(store, engine, host, "test-key")

Expand Down
148 changes: 136 additions & 12 deletions internal/lobby/lobby_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package lobby

import (
"context"
"errors"
"testing"
"time"

"github.com/google/uuid"
"github.com/shigde/sfu/internal/lobby/resources"
"github.com/shigde/sfu/internal/lobby/sessions"
"github.com/shigde/sfu/internal/logging"
"github.com/stretchr/testify/assert"
)

func testStreamLobbySetup(t *testing.T) (*lobby, uuid.UUID) {
func testLobbySetup(t *testing.T) (*lobby, uuid.UUID) {
t.Helper()
logging.SetupDebugLogger()
// set one session in lobby
_ = mockRtpEngineForOffer(MockedAnswer)
entity := &LobbyEntity{
UUID: uuid.New(),
LiveStreamId: uuid.New(),
Expand All @@ -21,18 +25,138 @@ func testStreamLobbySetup(t *testing.T) (*lobby, uuid.UUID) {

lobby := newLobby(entity.UUID, entity)
user := uuid.New()
//session := sessions.NewSession(user, lobby.hub, engine, lobby.sessionQuit)
////session.signal.messenger = newMockedMessenger(t)
////session.ingress = mockConnection(MockedAnswer)
////
////session.egress = mockConnection(MockedAnswer)
////session.signal.egress = session.egress
//lobby.sessions.Add(session)
lobby.newSession(user, nil)
return lobby, user
}
func TestStreamLobby(t *testing.T) {
func TestLobby(t *testing.T) {
t.Run("handle command successfully", func(t *testing.T) {
lobby, user := testLobbySetup(t)
cmd := &mockCmd{
user: user,
f: func(ctx context.Context, s *sessions.Session) (*resources.WebRTC, error) {
return &resources.WebRTC{SDP: MockedAnswer}, nil
},
Response: make(chan *resources.WebRTC),
Err: make(chan error),
}
go lobby.handle(cmd)

t.Run("new ingress egress", func(t *testing.T) {
select {
case res := <-cmd.Response:
assert.Equal(t, MockedAnswer, res.SDP)
case <-cmd.Err:
t.Fatalf("test fails because no error expected")
case <-time.After(time.Second * 3):
t.Fatalf("test fails because run in timeout")
}
})

t.Run("handle, command error session not found", func(t *testing.T) {
lobby, _ := testLobbySetup(t)
cmd := &mockCmd{
user: uuid.New(),
Err: make(chan error),
}
go lobby.handle(cmd)

select {
case <-cmd.Response:
t.Fatalf("test fails because no webrtc resource expected")
case err := <-cmd.Err:
assert.ErrorIs(t, err, errNoSession)
case <-time.After(time.Second * 3):
t.Fatalf("test fails because run in timeout")
}
})

t.Run("handle command with error", func(t *testing.T) {
cmdErr := errors.New("cmd test error")
lobby, user := testLobbySetup(t)
cmd := &mockCmd{
user: user,
f: func(ctx context.Context, s *sessions.Session) (*resources.WebRTC, error) {
return nil, cmdErr
},
Err: make(chan error),
}
go lobby.handle(cmd)

select {
case <-cmd.Response:
t.Fatalf("test fails because no webrtc resource expected")
case err := <-cmd.Err:
assert.ErrorIs(t, err, cmdErr)
case <-time.After(time.Second * 3):
t.Fatalf("test fails because run in timeout")
}
})

t.Run("handle command fails, because lobby context was done", func(t *testing.T) {
cmdCtxErr := errors.New("cmd context done")
lobby, user := testLobbySetup(t)
ctx, cancel := context.WithCancel(context.Background())
lobby.ctx = ctx
cmd := &mockCmd{
user: user,
f: func(paramCtx context.Context, s *sessions.Session) (*resources.WebRTC, error) {
select {
case <-paramCtx.Done():
return nil, cmdCtxErr
default:
return nil, nil
}
},
Err: make(chan error),
Response: make(chan *resources.WebRTC),
}
cancel()
go lobby.handle(cmd)

select {
case <-cmd.Response:
t.Fatalf("test fails because no webrtc resource expected")
case err := <-cmd.Err:
assert.ErrorIs(t, err, cmdCtxErr)
case <-time.After(time.Second * 3):
t.Fatalf("test fails because run in timeout")
}
})

t.Run("new session added", func(t *testing.T) {
lobby, _ := testLobbySetup(t)
user := uuid.New()
ok := lobby.newSession(user, nil)
assert.True(t, ok)
})

t.Run("new session not added", func(t *testing.T) {
lobby, user := testLobbySetup(t)
ok := lobby.newSession(user, nil)
assert.False(t, ok)
})
}

type mockCmd struct {
ctx context.Context
user uuid.UUID
Response chan *resources.WebRTC
Err chan error
f func(ctx context.Context, s *sessions.Session) (*resources.WebRTC, error)
}

func (mc *mockCmd) GetUserId() uuid.UUID {
return mc.user
}

func (mc *mockCmd) Execute(ctx context.Context, session *sessions.Session) {
res, err := mc.f(ctx, session)
if err != nil {
mc.Err <- err
return
}
mc.Response <- res
}

func (mc *mockCmd) Fail(err error) {
mc.Err <- err
}
3 changes: 3 additions & 0 deletions internal/lobby/resources/webrtc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package resources

import "github.com/pion/webrtc/v3"

type WebRTC struct {
SDP *webrtc.SessionDescription
}
5 changes: 4 additions & 1 deletion internal/lobby/sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Session struct {
signal *signal

stop context.CancelFunc
Done <-chan struct{}
}

func NewSession(ctx context.Context, user uuid.UUID, hub *Hub, engine RtpEngine) *Session {
Expand All @@ -38,12 +39,14 @@ func NewSession(ctx context.Context, user uuid.UUID, hub *Hub, engine RtpEngine)
signal := newSignal(ctx, sessionId, user)

session := &Session{
Id: uuid.New(),
Id: uuid.New(),

user: user,
rtpEngine: engine,
hub: hub,
signal: signal,
stop: cancel,
Done: ctx.Done(),
}

// signal.onMuteCbk = session.onMuteTrack
Expand Down
12 changes: 12 additions & 0 deletions internal/lobby/sessions/session_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ func NewSessionRepository() *SessionRepository {
}
}

func (r *SessionRepository) New(s *Session) bool {
r.locker.Lock()
defer r.locker.Unlock()
for _, session := range r.sessions {
if session.user == s.user {
return false
}
}
r.sessions[s.Id] = s
return true
}

func (r *SessionRepository) Add(s *Session) {
r.locker.Lock()
defer r.locker.Unlock()
Expand Down

0 comments on commit 513a76e

Please sign in to comment.