Skip to content

Commit

Permalink
Fix RPC invocation and room sid update (#391)
Browse files Browse the repository at this point in the history
* Fix RPC invocation and room sid update

* Create curvy-lobsters-draw.md
  • Loading branch information
lukasIO authored Jan 24, 2025
1 parent 88e07b8 commit 7801a49
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 68 deletions.
5 changes: 5 additions & 0 deletions .changeset/curvy-lobsters-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

Fix RPC invocation and room sid update
178 changes: 110 additions & 68 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,26 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return this.ffiHandle != undefined && this.connectionState != ConnectionState.CONN_DISCONNECTED;
}

async getSid(): Promise<string | undefined> {
return this.info?.sid; // TODO update this to handle async room updates once rust protocol has been updated
async getSid(): Promise<string> {
if (!this.isConnected) {
return '';
}
if (this.info?.sid && this.info.sid !== '') {
return this.info.sid;
}
return new Promise((resolve, reject) => {
const handleRoomUpdate = (sid: string) => {
if (sid !== '') {
this.off(RoomEvent.RoomSidChanged, handleRoomUpdate);
resolve(sid);
}
};
this.on(RoomEvent.RoomSidChanged, handleRoomUpdate);
this.once(RoomEvent.Disconnected, () => {
this.off(RoomEvent.RoomSidChanged, handleRoomUpdate);
reject('Room disconnected before room server id was available');
});
});
}

async connect(url: string, token: string, opts?: RoomOptions) {
Expand Down Expand Up @@ -158,9 +176,27 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}

private onFfiEvent = (ffiEvent: FfiEvent) => {
if (
if (!this.localParticipant || !this.ffiHandle || !this.info) {
throw TypeError('cannot handle ffi events before connectCallback');
}

if (ffiEvent.message.case == 'rpcMethodInvocation') {
if (
ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle
) {
this.localParticipant.handleRpcMethodInvocation(
ffiEvent.message.value.invocationId!,
ffiEvent.message.value.method!,
ffiEvent.message.value.requestId!,
ffiEvent.message.value.callerIdentity!,
ffiEvent.message.value.payload!,
ffiEvent.message.value.responseTimeoutMs!,
);
}
return;
} else if (
ffiEvent.message.case != 'roomEvent' ||
ffiEvent.message.value.roomHandle != this.ffiHandle?.handle
ffiEvent.message.value.roomHandle != this.ffiHandle.handle
) {
return;
}
Expand Down Expand Up @@ -191,96 +227,97 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
participant!.trackPublications.set(publication.sid!, publication);
this.emit(RoomEvent.TrackPublished, publication, participant!);
} else if (ev.case == 'trackUnpublished') {
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
const publication = participant!.trackPublications.get(ev.value.publicationSid!);
participant!.trackPublications.delete(ev.value.publicationSid!);
this.emit(RoomEvent.TrackUnpublished, publication!, participant!);
const participant = this.requireRemoteParticipant(ev.value.participantIdentity!);
const publication = participant.trackPublications.get(ev.value.publicationSid!);
participant.trackPublications.delete(ev.value.publicationSid!);
if (publication) {
this.emit(RoomEvent.TrackUnpublished, publication, participant);
}
} else if (ev.case == 'trackSubscribed') {
const ownedTrack = ev.value.track;
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
const publication = participant!.trackPublications.get(ownedTrack!.info!.sid!);
publication!.subscribed = true;
if (ownedTrack!.info!.kind == TrackKind.KIND_VIDEO) {
publication!.track = new RemoteVideoTrack(ownedTrack!);
} else if (ownedTrack!.info!.kind == TrackKind.KIND_AUDIO) {
publication!.track = new RemoteAudioTrack(ownedTrack!);
const ownedTrack = ev.value.track!;
const trackInfo = ownedTrack.info!;
const { participant, publication } = this.requirePublicationOfRemoteParticipant(
ev.value.participantIdentity!,
trackInfo.sid!,
);
publication.subscribed = true;
if (trackInfo.kind == TrackKind.KIND_VIDEO) {
publication.track = new RemoteVideoTrack(ownedTrack);
} else if (trackInfo.kind == TrackKind.KIND_AUDIO) {
publication.track = new RemoteAudioTrack(ownedTrack);
}

this.emit(RoomEvent.TrackSubscribed, publication!.track!, publication!, participant!);
this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant);
} else if (ev.case == 'trackUnsubscribed') {
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
const publication = participant!.trackPublications.get(ev.value.trackSid!);
publication!.track = undefined;
publication!.subscribed = false;
this.emit(RoomEvent.TrackUnsubscribed, publication!.track!, publication!, participant!);
} else if (ev.case == 'trackSubscriptionFailed') {
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
this.emit(
RoomEvent.TrackSubscriptionFailed,
const { participant, publication } = this.requirePublicationOfRemoteParticipant(
ev.value.participantIdentity!,
ev.value.trackSid!,
participant!,
ev.value.error,
);
const track = publication.track!;
publication.track = undefined;
publication.subscribed = false;
this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant);
} else if (ev.case == 'trackSubscriptionFailed') {
const participant = this.requireRemoteParticipant(ev.value.participantIdentity!);
this.emit(RoomEvent.TrackSubscriptionFailed, ev.value.trackSid!, participant, ev.value.error);
} else if (ev.case == 'trackMuted') {
const participant = this.remoteParticipants.get(ev.value.participantIdentity!);
const publication = participant!.trackPublications.get(ev.value.trackSid!);
publication!.info!.muted = true;
if (publication!.track) {
publication!.track.info!.muted = true;
const { participant, publication } = this.requirePublicationOfParticipant(
ev.value.participantIdentity!,
ev.value.trackSid!,
);
publication.info!.muted = true;
if (publication.track) {
publication.track.info!.muted = true;
}
this.emit(RoomEvent.TrackMuted, publication!, participant!);
this.emit(RoomEvent.TrackMuted, publication, participant);
} else if (ev.case == 'trackUnmuted') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
const publication = participant!.trackPublications.get(ev.value.trackSid!);
publication!.info!.muted = false;
if (publication!.track) {
publication!.track.info!.muted = false;
const { participant, publication } = this.requirePublicationOfParticipant(
ev.value.participantIdentity!,
ev.value.trackSid!,
);
publication.info!.muted = false;
if (publication.track) {
publication.track.info!.muted = false;
}
this.emit(RoomEvent.TrackUnmuted, publication!, participant!);
this.emit(RoomEvent.TrackUnmuted, publication, participant);
} else if (ev.case == 'activeSpeakersChanged') {
const activeSpeakers = ev.value.participantIdentities.map((identity) =>
this.retrieveParticipantByIdentity(identity),
);
this.emit(
RoomEvent.ActiveSpeakersChanged,
activeSpeakers.map((s) => s!),
this.requireParticipantByIdentity(identity),
);
this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
} else if (ev.case == 'roomMetadataChanged') {
this.info!.metadata = ev.value.metadata;
this.emit(RoomEvent.RoomMetadataChanged, this.info!.metadata!);
} else if (ev.case == 'participantMetadataChanged') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
participant!.info!.metadata = ev.value.metadata;
this.emit(RoomEvent.ParticipantMetadataChanged, participant!.metadata, participant!);
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.metadata = ev.value.metadata;
this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant);
} else if (ev.case == 'participantNameChanged') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
participant!.info!.name = ev.value.name;
this.emit(RoomEvent.ParticipantNameChanged, participant!.name!, participant!);
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.name = ev.value.name;
this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant);
} else if (ev.case == 'participantAttributesChanged') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
participant!.info!.attributes = ev.value.attributes.reduce(
(obj, item) => {
obj[item.key!] = item.value!;
return obj;
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.attributes = ev.value.attributes.reduce(
(acc, value) => {
acc[value.key!] = value.value!;
return acc;
},
{} as Record<string, string>,
);
if (Object.keys(ev.value.changedAttributes).length > 0) {
this.emit(
RoomEvent.ParticipantAttributesChanged,
ev.value.changedAttributes.reduce(
(obj, item) => {
obj[item.key!] = item.value!;
return obj;
},
{} as Record<string, string>,
),
participant!,
const changedAttributes = ev.value.changedAttributes.reduce(
(acc, value) => {
acc[value.key!] = value.value!;
return acc;
},
{} as Record<string, string>,
);
this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant);
}
} else if (ev.case == 'connectionQualityChanged') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant!);
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant);
} else if (ev.case == 'chatMessage') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
const { id, message: messageText, timestamp, editTimestamp, generated } = ev.value.message!;
Expand Down Expand Up @@ -334,6 +371,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.emit(RoomEvent.Reconnecting);
} else if (ev.case == 'reconnected') {
this.emit(RoomEvent.Reconnected);
} else if (ev.case == 'roomSidChanged') {
this.emit(RoomEvent.RoomSidChanged, ev.value.sid!);
}
};

Expand Down Expand Up @@ -428,6 +467,7 @@ export type RoomCallbacks = {
trackUnmuted: (publication: TrackPublication, participant: Participant) => void;
activeSpeakersChanged: (speakers: Participant[]) => void;
roomMetadataChanged: (metadata: string) => void;
roomInfoUpdated: (info: RoomInfo) => void;
participantMetadataChanged: (metadata: string | undefined, participant: Participant) => void;
participantNameChanged: (name: string, participant: Participant) => void;
participantAttributesChanged: (
Expand All @@ -449,6 +489,7 @@ export type RoomCallbacks = {
disconnected: (reason: DisconnectReason) => void;
reconnecting: () => void;
reconnected: () => void;
roomSidChanged: (sid: string) => void;
};

export enum RoomEvent {
Expand All @@ -466,6 +507,7 @@ export enum RoomEvent {
TrackUnmuted = 'trackUnmuted',
ActiveSpeakersChanged = 'activeSpeakersChanged',
RoomMetadataChanged = 'roomMetadataChanged',
RoomSidChanged = 'roomSidChanged',
ParticipantMetadataChanged = 'participantMetadataChanged',
ParticipantNameChanged = 'participantNameChanged',
ParticipantAttributesChanged = 'participantAttributesChanged',
Expand Down

0 comments on commit 7801a49

Please sign in to comment.