Skip to content

Commit

Permalink
refactor: remove redis declarations test
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmatau79 committed Jan 14, 2025
1 parent d4c4295 commit 75a1c6e
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 192 deletions.
4 changes: 1 addition & 3 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { NotificationService } from './lib/notification.service'
import { HttpModule } from '@nestjs/axios'
import { RoomsModule } from './rooms/rooms.module'
import { HandledRedisModule } from './modules/redis.module'

Check failure on line 7 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / build

'HandledRedisModule' is defined but never used
import { RoomFactory } from './lib/RoomFactory'

@Module({
imports: [
Expand All @@ -15,10 +14,9 @@ import { RoomFactory } from './lib/RoomFactory'
isGlobal: true,
}),
RoomsModule,
HandledRedisModule,
],
controllers: [],
providers: [NotificationService, HandledRedisModule, RoomFactory],
providers: [NotificationService],
exports: [NotificationService],
})
export class AppModule {}
119 changes: 0 additions & 119 deletions src/lib/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { config } from '../config/config.server'
import { Device } from './room.interfaces'
import * as throttle from '@sitespeed.io/throttle'
import { Producer } from 'mediasoup/node/lib/types'
import Redis from 'ioredis'

@Injectable()
export class Room extends EventEmitter {
Expand All @@ -26,10 +25,6 @@ export class Room extends EventEmitter {
private readonly webRtcServer: any
private networkThrottled: boolean
private closed = false
private readonly redis: Redis
private readonly redisSubscriber: Redis
private readonly redisPublisher: Redis
private pipeTransport: mediasoup.types.PipeTransport | null = null

static async create({
mediasoupWorker,
Expand Down Expand Up @@ -78,7 +73,6 @@ export class Room extends EventEmitter {
activeSpeakerObserver,
consumerReplicas,
maxPeerCount,
redis,
}: {
roomId: string
protooRoom: protoo.Room
Expand All @@ -88,7 +82,6 @@ export class Room extends EventEmitter {
activeSpeakerObserver: mediasoup.types.ActiveSpeakerObserver
consumerReplicas: number
maxPeerCount?: number
redis?: Redis
}) {
super()

Expand All @@ -104,9 +97,6 @@ export class Room extends EventEmitter {
this.webRtcServer = webRtcServer
this.networkThrottled = false
this.notificationService = new NotificationService()
this.redis = redis
this.redisSubscriber = this.redis.duplicate()
this.redisPublisher = this.redis.duplicate()
}

close(): void {
Expand Down Expand Up @@ -506,15 +496,6 @@ export class Room extends EventEmitter {
rtpParameters,
appData,
})

// publish producer
const event = {
action: 'producerCreated',
roomId: this.roomId,
producerId: producer.id,
instance: config.mediasoup.pipeTransportOptions.listenIp.announcedIp,
}
await this.redisPublisher.publish('rooms', JSON.stringify(event))
} catch (error) {
this.logger.error(`Failed to produce on transport with id "${transportId}": ${error}`)
reject(`Failed to produce on transport with id ${transportId}`)
Expand Down Expand Up @@ -1304,16 +1285,6 @@ export class Room extends EventEmitter {
await consumer.resume()

consumerPeer.notify('consumerScore', { consumerId: consumer.id, score: consumer.score }).catch(() => {})

// publish consumer tu create piperouter
const event = {
action: 'consumerCreated',
roomId: this.roomId,
consumerId: consumer.id,
producerId: producer.id,
peerId: consumerPeer.id,
}
await this.redisPublisher.publish('rooms', JSON.stringify(event))
} catch (error) {
this.logger.warn(`createConsumer() | failed: ${error}`)
}
Expand Down Expand Up @@ -1998,96 +1969,6 @@ export class Room extends EventEmitter {
})
}

//Handles Pipe transport

async createPipeTransport(router: mediasoup.types.Router) {
const pipeTransportOptions: mediasoup.types.PipeTransportOptions = config.mediasoup.pipeTransportOptions
const pipeTransport = await router.createPipeTransport(pipeTransportOptions)

return pipeTransport
}

async registerPipeTransportInfo(pipeTransport: mediasoup.types.PipeTransport, roomId: string) {
const info = {
ip: pipeTransport.tuple.localIp,
port: pipeTransport.tuple.localPort,
srtpParameters: pipeTransport.srtpParameters,
}

await this.redis.set(`pipeTransport:${roomId}-${pipeTransport.tuple.localIp}`, JSON.stringify(info))
this.logger.log(`PipeTransport info registered for room ${roomId}`)
}

async connectToRemotePipeTransport(
router: mediasoup.types.Router,
roomId: string,
instance: string,
): Promise<mediasoup.types.PipeTransport> {
const localIp = config.mediasoup.pipeTransportOptions.listenIp.announcedIp
const info = await this.redis.get(`pipeTransport:${roomId}-${instance}`)

if (!info) {
throw new Error(`No PipeTransport info found for room ${roomId}`)
}

const remoteInfo = JSON.parse(info)

this.logger.debug(`*** remoteIp: ${remoteInfo.ip} --- localIp: ${localIp}`)

if (remoteInfo.ip === localIp) {
this.logger.warn(`Skipping connection to PipeTransport of room ${roomId} on local IP ${localIp}`)
return null
}

const pipeTransportOptions: mediasoup.types.PipeTransportOptions = config.mediasoup.pipeTransportOptions
const pipeTransport = await router.createPipeTransport(pipeTransportOptions)

await pipeTransport
.connect({
ip: remoteInfo.ip,
port: remoteInfo.port,
srtpParameters: remoteInfo.srtpParameters,
})
.catch((error) => {
this.logger.error(`Failed to Connected remote PipeTransport: ${error.message}`)
})

this.logger.log(`Connected to remote PipeTransport for room ${roomId}`)
return pipeTransport
}

async pipeProducerToRemoteRouter(
producer: mediasoup.types.Producer,
pipeTransport: mediasoup.types.PipeTransport,
): Promise<mediasoup.types.Producer> {
const id: string = producer.id
const pipeProducer = await pipeTransport.produce(producer)

this.logger.log(`PipeProducer created for Producer ID: ${id}`)
return pipeProducer
}

async pipeConsumerToRemoteRouter(
consumer: mediasoup.types.Consumer,
pipeTransport: mediasoup.types.PipeTransport,
): Promise<mediasoup.types.Consumer> {
const pipeConsumer = await pipeTransport.consume(consumer)

this.logger.log(`PipeConsumer created for Consumer ID: ${consumer.id}`)
return pipeConsumer
}

private async initializePipeTransport(roomId: string): Promise<void> {
try {
this.pipeTransport = await this.createPipeTransport(this.mediasoupRouter)
await this.registerPipeTransportInfo(this.pipeTransport, roomId)
this.logger.log(`PipeTransport initialized for room ${roomId}`)
} catch (error) {
this.logger.error(`Error initializing PipeTransport for room ${roomId}: ${error.message}`)
throw error
}
}

/**
* Retrieves a producer by its ID.
* @param producerId - The ID of the producer.
Expand Down
1 change: 0 additions & 1 deletion src/lib/RoomFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export class RoomFactory {
activeSpeakerObserver,
consumerReplicas,
maxPeerCount,
redis,
})
}
}
4 changes: 2 additions & 2 deletions src/rooms/rooms.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Module } from '@nestjs/common'
import { RoomsService } from './rooms.service'
import { RoomsController } from './rooms.controller'
import { RoomFactory } from 'src/lib/RoomFactory'


@Module({
controllers: [RoomsController],
providers: [RoomsService, RoomFactory],
providers: [RoomsService],
})
export class RoomsModule {}
69 changes: 3 additions & 66 deletions src/rooms/rooms.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@ import {
CreateBroadcasterDto,
CreateBroadcasterProducerDto,
CreateBroadcasterTransportDto,
RoomEventDto,
} from './dto/rooms.dto'
import * as protoo from 'protoo-server'
import * as url from 'url'
import { Server } from 'https'
import { NotificationService } from '../lib/notification.service'
import { RoomFactory } from 'src/lib/RoomFactory'
import { InjectRedis } from '@nestjs-modules/ioredis'
import Redis from 'ioredis'
import { plainToClass } from 'class-transformer'

@Injectable()
export class RoomsService implements OnModuleInit, OnModuleDestroy {
Expand All @@ -41,16 +36,9 @@ export class RoomsService implements OnModuleInit, OnModuleDestroy {

private protooServer: protoo.WebSocketServer
private httpServer: Server
private readonly redisSubscriber: Redis
private readonly redisPublisher: Redis

constructor(
private roomFactory: RoomFactory,
@InjectRedis() private readonly redis: Redis,
) {
constructor() {
this.notificationService = new NotificationService()
this.redisSubscriber = this.redis.duplicate()
this.redisPublisher = this.redis.duplicate()
}

async onModuleInit(): Promise<void> {
Expand Down Expand Up @@ -85,13 +73,6 @@ export class RoomsService implements OnModuleInit, OnModuleDestroy {
reject(500, error.message)
})
})

this.redisSubscriber.subscribe('rooms', (err, count) => {
if (err) this.logger.error(err.message)
this.logger.log(`Subscribed ${count} to channel rooms.`)
})

this.initializeRoomSubscriber()
}

async onModuleDestroy() {
Expand Down Expand Up @@ -278,22 +259,15 @@ export class RoomsService implements OnModuleInit, OnModuleDestroy {
const mediasoupWorker = this.getNextMediasoupWorker()
// Create the new room instance

const room = await this.roomFactory.createRoom({
const room = await Room.create({
mediasoupWorker,
roomId: roomId,
consumerReplicas: 0,
maxPeerCount,
mediaCodecs: config.mediasoup.routerOptions.mediaCodecs as mediasoup.types.RtpCodecCapability[],
})

await this.redisPublisher.publish(
'rooms',
JSON.stringify({
action: 'roomCreated',
roomId,
instance: config.mediasoup.pipeTransportOptions.listenIp.announcedIp,
}),
)
this.logger.debug(`*** room has been created ***`)
// Store the room in the rooms map
this.rooms.set(roomId, room)
// Handle room closure
Expand Down Expand Up @@ -632,41 +606,4 @@ export class RoomsService implements OnModuleInit, OnModuleDestroy {
private getRoomById(roomId: string): Room | undefined {
return this.rooms.get(roomId)
}

/**
* Subscribes to the Redis channel "rooms".
* Handles incoming messages about new rooms and producer/consumer events in other instances.
*/
private async initializeRoomSubscriber(): Promise<void> {
this.redisSubscriber.on('message', async (channel, message: string) => {
const event: RoomEventDto = plainToClass(RoomEventDto, JSON.parse(message))

this.logger.debug(`*** Received event: ${JSON.stringify(event)} ***`)

const room = this.rooms.get(event.roomId)

// Process events based on their action type
switch (event.action) {
case 'roomCreated':
if (!room) {
this.logger.log(`Detected new room ${event.roomId} from another instance`)
try {
const room = await this.getOrCreateRoom({ roomId: event.roomId })
const { mediasoupRouter } = room
room.connectToRemotePipeTransport(mediasoupRouter, event.roomId, event.instance)
} catch (error) {
this.logger.error(`Failed to connect to remote PipeTransport: ${error.message}`)
}
}

break

default:
this.logger.warn(`Unknown action type: ${event.action}`)
}
})

// Log a message indicating successful subscription to the channel
this.logger.log('Subscribed to Redis channel for room synchronization.')
}
}
1 change: 0 additions & 1 deletion test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,4 @@ describe('Protoo WebSocket E2E Test', () => {
expect(response.body).toHaveProperty('protocol', '2060-mediasoup-v1')
})

//TODO: connect to the Protoo server and handle requests
})

0 comments on commit 75a1c6e

Please sign in to comment.