From 829ea8b10fa7f1884728794f423d9e4759ab4c33 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Wed, 24 Apr 2024 23:09:50 +0700 Subject: [PATCH] feat: channel pub-sub feature and tests. cluster integration test (#262) * feat: channel pub-sub feature and tests. cluster integration test * chore: fix typos --- README.md | 2 +- packages/media_core/src/cluster.rs | 119 +++++++-- .../media_core/src/cluster/id_generator.rs | 42 +++ packages/media_core/src/cluster/room.rs | 55 ++-- .../src/cluster/room/channel_pub.rs | 113 +++++++- .../src/cluster/room/channel_sub.rs | 248 ++++++++++++++++-- .../media_core/src/cluster/room/metadata.rs | 112 ++++---- 7 files changed, 539 insertions(+), 152 deletions(-) create mode 100644 packages/media_core/src/cluster/id_generator.rs diff --git a/README.md b/README.md index b8f1b3e7..39bc6ff9 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ For a deep dive into the technical aspects of network architecture, please refer ## Project Status: Refactoring -We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy). +We are actively refactoring entire media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for an older version, please check out the [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy). ## Features diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index d83fd7e0..cd8bf7f3 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -5,7 +5,7 @@ //! use derive_more::{AsRef, Display, From}; -use sans_io_runtime::{Task, TaskGroup}; +use sans_io_runtime::TaskGroup; use std::{ collections::HashMap, fmt::Debug, @@ -23,6 +23,7 @@ use crate::transport::{LocalTrackId, RemoteTrackId}; use self::room::ClusterRoom; +mod id_generator; mod room; #[derive(Clone, Copy, From, AsRef, PartialEq, Eq, Debug, Display, Hash)] @@ -46,14 +47,14 @@ pub enum ClusterRemoteTrackControl { #[derive(Clone, Debug, PartialEq, Eq)] pub enum ClusterRemoteTrackEvent { RequestKeyFrame, - LimitBitrate { min: u32, max: u32 }, + LimitBitrate { min: u64, max: u64 }, } #[derive(Debug, Clone)] pub enum ClusterLocalTrackControl { Subscribe(PeerId, TrackName), RequestKeyFrame, - DesiredBitrate(u32), + DesiredBitrate(u64), Unsubscribe, } @@ -90,6 +91,7 @@ pub enum Input { Endpoint(Owner, ClusterRoomHash, ClusterEndpointControl), } +#[derive(Debug, PartialEq, Eq)] pub enum Output { Sdn(ClusterRoomHash, FeaturesControl), Endpoint(Vec, ClusterEndpointEvent), @@ -98,7 +100,7 @@ pub enum Output { pub struct MediaCluster { rooms_map: HashMap, - rooms: TaskGroup, Output, ClusterRoom, 128>, + rooms: TaskGroup, room::Output, ClusterRoom, 128>, } impl Default for MediaCluster { @@ -112,41 +114,126 @@ impl Default for MediaCluster { impl MediaCluster { pub fn on_tick(&mut self, now: Instant) -> Option> { - let (_index, out) = self.rooms.on_tick(now)?; - Some(out) + let (index, out) = self.rooms.on_tick(now)?; + Some(self.process_room_output(index, out)) } pub fn on_sdn_event(&mut self, now: Instant, room: ClusterRoomHash, event: FeaturesEvent) -> Option> { let index = self.rooms_map.get(&room)?; - self.rooms.on_event(now, *index, room::Input::Sdn(event)) + let out = self.rooms.on_event(now, *index, room::Input::Sdn(event))?; + Some(self.process_room_output(*index, out)) } pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, room_hash: ClusterRoomHash, control: ClusterEndpointControl) -> Option> { if let Some(index) = self.rooms_map.get(&room_hash) { - self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control)) + let out = self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))?; + Some(self.process_room_output(*index, out)) } else { log::info!("[MediaCluster] create room {}", room_hash); - let mut room = ClusterRoom::new(room_hash); - let out = room.on_event(now, room::Input::Endpoint(owner, control)); - let index = self.rooms.add_task(room); + let index = self.rooms.add_task(ClusterRoom::new(room_hash)); self.rooms_map.insert(room_hash, index); - out + let out = self.rooms.on_event(now, index, room::Input::Endpoint(owner, control))?; + Some(self.process_room_output(index, out)) } } pub fn pop_output(&mut self, now: Instant) -> Option> { - let (_index, out) = self.rooms.pop_output(now)?; - Some(out) + let (index, out) = self.rooms.pop_output(now)?; + Some(self.process_room_output(index, out)) } pub fn shutdown<'a>(&mut self, now: Instant) -> Option> { - let (_index, out) = self.rooms.shutdown(now)?; - Some(out) + let (index, out) = self.rooms.shutdown(now)?; + Some(self.process_room_output(index, out)) + } + + fn process_room_output(&mut self, index: usize, out: room::Output) -> Output { + match out { + room::Output::Sdn(userdata, control) => Output::Sdn(userdata, control), + room::Output::Endpoint(owners, event) => Output::Endpoint(owners, event), + room::Output::Destroy => { + self.rooms.remove_task(index); + Output::Continue + } + } } } #[cfg(test)] mod tests { + use std::time::Instant; + + use atm0s_sdn::features::{ + dht_kv::{self, MapControl, MapEvent}, + FeaturesControl, FeaturesEvent, + }; + use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe}; + + use crate::cluster::{id_generator, ClusterEndpointEvent}; + + use super::{ClusterEndpointControl, ClusterRoomHash, MediaCluster, Output}; + //TODO should create room when new room event arrived //TODO should route to correct room + //TODO should remove room after all peers leaved + #[test] + fn room_manager_should_work() { + let mut cluster = MediaCluster::::default(); + + let owner = 1; + let room_hash = ClusterRoomHash(1); + let room_peers_map = id_generator::peers_map(room_hash); + let peer = PeerId("peer1".to_string()); + let peer_key = id_generator::peers_key(&peer); + let peer_info = PeerInfo::new(peer.clone(), PeerMeta {}); + + // Not join room with scope (peer true, track false) should Set and Sub + let out = cluster.on_endpoint_control( + Instant::now(), + owner, + room_hash, + ClusterEndpointControl::Join( + peer.clone(), + peer_info.meta.clone(), + RoomInfoPublish { peer: true, tracks: false }, + RoomInfoSubscribe { peers: true, tracks: false }, + ), + ); + assert_eq!( + out, + Some(Output::Sdn( + room_hash, + FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Set(peer_key, peer_info.serialize()))) + )) + ); + assert_eq!( + cluster.pop_output(Instant::now()), + Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Sub)))) + ); + assert_eq!(cluster.pop_output(Instant::now()), None); + assert_eq!(cluster.rooms.tasks(), 1); + + // Correct forward to room + let out = cluster.on_sdn_event( + Instant::now(), + room_hash, + FeaturesEvent::DhtKv(dht_kv::Event::MapEvent(room_peers_map, MapEvent::OnSet(peer_key, 1, peer_info.serialize()))), + ); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(peer.clone(), peer_info.meta.clone())))); + assert_eq!(cluster.pop_output(Instant::now()), None); + + // Now leave room should Del and Unsub + let out = cluster.on_endpoint_control(Instant::now(), owner, room_hash, ClusterEndpointControl::Leave); + assert_eq!( + out, + Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Del(peer_key))))) + ); + assert_eq!( + cluster.pop_output(Instant::now()), + Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Unsub)))) + ); + assert_eq!(cluster.pop_output(Instant::now()), Some(Output::Continue)); //this is for destroy event + assert_eq!(cluster.pop_output(Instant::now()), None); + assert_eq!(cluster.rooms.tasks(), 0); + } } diff --git a/packages/media_core/src/cluster/id_generator.rs b/packages/media_core/src/cluster/id_generator.rs new file mode 100644 index 00000000..0d662f4f --- /dev/null +++ b/packages/media_core/src/cluster/id_generator.rs @@ -0,0 +1,42 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + +use atm0s_sdn::features::dht_kv::{Key, Map}; +use media_server_protocol::endpoint::{PeerId, TrackName}; + +use super::ClusterRoomHash; + +pub fn peer_map(room: ClusterRoomHash, peer: &PeerId) -> Map { + let mut h = DefaultHasher::new(); + room.as_ref().hash(&mut h); + peer.as_ref().hash(&mut h); + h.finish().into() +} + +pub fn peers_map(room: ClusterRoomHash) -> Map { + room.0.into() +} + +pub fn peers_key(peer: &PeerId) -> Key { + let mut h = DefaultHasher::new(); + peer.as_ref().hash(&mut h); + h.finish().into() +} + +pub fn tracks_map(room: ClusterRoomHash) -> Map { + (room.0 + 1).into() +} + +pub fn tracks_key(peer: &PeerId, track: &TrackName) -> Key { + let mut h = DefaultHasher::new(); + peer.as_ref().hash(&mut h); + track.as_ref().hash(&mut h); + h.finish().into() +} + +pub fn gen_channel_id>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T { + let mut h = std::hash::DefaultHasher::new(); + room.as_ref().hash(&mut h); + peer.as_ref().hash(&mut h); + track.as_ref().hash(&mut h); + h.finish().into() +} diff --git a/packages/media_core/src/cluster/room.rs b/packages/media_core/src/cluster/room.rs index 52679ead..ee096334 100644 --- a/packages/media_core/src/cluster/room.rs +++ b/packages/media_core/src/cluster/room.rs @@ -7,39 +7,32 @@ //! - Send/Receive pubsub channel //! -use std::{ - collections::VecDeque, - fmt::Debug, - hash::{Hash, Hasher}, - time::Instant, -}; +use std::{collections::VecDeque, fmt::Debug, hash::Hash, time::Instant}; use atm0s_sdn::features::{dht_kv, pubsub, FeaturesControl, FeaturesEvent}; -use media_server_protocol::endpoint::{PeerId, TrackName}; use sans_io_runtime::{Task, TaskSwitcher}; use crate::transport::{LocalTrackId, RemoteTrackId}; use self::{channel_pub::RoomChannelPublisher, channel_sub::RoomChannelSubscribe, metadata::RoomMetadata}; -use super::{ClusterEndpointControl, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash, Output}; +use super::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash}; mod channel_pub; mod channel_sub; mod metadata; -#[derive(num_enum::TryFromPrimitive)] -#[repr(u8)] -pub enum FeedbackKind { - Bitrate = 0, - KeyFrameRequest = 1, -} - pub enum Input { Sdn(FeaturesEvent), Endpoint(Owner, ClusterEndpointControl), } +pub enum Output { + Sdn(ClusterRoomHash, FeaturesControl), + Endpoint(Vec, ClusterEndpointEvent), + Destroy, +} + #[derive(num_enum::TryFromPrimitive)] #[repr(usize)] enum TaskType { @@ -55,6 +48,7 @@ pub struct ClusterRoom { subscriber: RoomChannelSubscribe, switcher: TaskSwitcher, queue: VecDeque>, + destroyed: bool, //this flag for avoiding multi-time output destroy output } impl Task, Output> for ClusterRoom { @@ -73,8 +67,8 @@ impl Task, Output> if let Some(out) = self.queue.pop_front() { return Some(out); } - loop { - match self.switcher.queue_current()?.try_into().ok()? { + while let Some(c) = self.switcher.queue_current() { + match c.try_into().ok()? { TaskType::Metadata => { if let Some(out) = self.switcher.queue_process(self.metadata.pop_output(now)) { return Some(self.process_meta_output(out)); @@ -92,6 +86,14 @@ impl Task, Output> } } } + + if self.metadata.peers() == 0 && !self.destroyed { + log::info!("[ClusterRoom {}] leave last peer => remove room", self.room); + self.destroyed = true; + Some(Output::Destroy) + } else { + None + } } fn shutdown(&mut self, _now: Instant) -> Option> { @@ -108,6 +110,7 @@ impl ClusterRoom { subscriber: RoomChannelSubscribe::new(room), switcher: TaskSwitcher::new(3), queue: VecDeque::new(), + destroyed: false, } } @@ -145,8 +148,8 @@ impl ClusterRoom { Some(self.process_meta_output(out)) } ClusterEndpointControl::Leave => { - let out = self.metadata.on_leave(owner)?; - Some(self.process_meta_output(out)) + let out = self.metadata.on_leave(owner); + Some(self.process_meta_output(out?)) } ClusterEndpointControl::SubscribePeer(target) => { let out = self.metadata.on_subscribe_peer(owner, target)?; @@ -195,11 +198,11 @@ impl ClusterRoom { } } - fn control_local_track(&mut self, _now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option> { + fn control_local_track(&mut self, now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option> { let out = match control { ClusterLocalTrackControl::Subscribe(target_peer, target_track) => self.subscriber.on_track_subscribe(owner, track_id, target_peer, target_track), ClusterLocalTrackControl::RequestKeyFrame => self.subscriber.on_track_request_key(owner, track_id), - ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(owner, track_id, bitrate), + ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(now, owner, track_id, bitrate), ClusterLocalTrackControl::Unsubscribe => self.subscriber.on_track_unsubscribe(owner, track_id), }?; Some(self.process_subscriber_output(out)) @@ -230,14 +233,6 @@ impl ClusterRoom { } } -pub fn gen_channel_id>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T { - let mut h = std::hash::DefaultHasher::new(); - room.as_ref().hash(&mut h); - peer.as_ref().hash(&mut h); - track.as_ref().hash(&mut h); - h.finish().into() -} - #[cfg(test)] mod tests { //TODO join room should set key-value and SUB to maps @@ -247,7 +242,7 @@ mod tests { //TODO track feedback should fire event to endpoint //TODO track stopped should DEL key-value and pubsub STOP //TODO subscribe track should SUB channel - //TODO fedback track should FEEDBACK channel + //TODO feddback track should FEEDBACK channel //TODO channel data should fire event to endpoint //TODO unsubscribe track should UNSUB channel } diff --git a/packages/media_core/src/cluster/room/channel_pub.rs b/packages/media_core/src/cluster/room/channel_pub.rs index 622aae45..6ef68d4a 100644 --- a/packages/media_core/src/cluster/room/channel_pub.rs +++ b/packages/media_core/src/cluster/room/channel_pub.rs @@ -8,17 +8,32 @@ use std::{ use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId, Feedback}; use media_server_protocol::{ - endpoint::{PeerId, TrackMeta, TrackName}, + endpoint::{PeerId, TrackName}, media::MediaPacket, }; use crate::{ - cluster::{ClusterEndpointEvent, ClusterRemoteTrackEvent, ClusterRoomHash}, + cluster::{id_generator, ClusterEndpointEvent, ClusterRemoteTrackEvent, ClusterRoomHash}, transport::RemoteTrackId, }; -use super::FeedbackKind; +pub enum FeedbackKind { + Bitrate { min: u64, max: u64 }, + KeyFrameRequest, +} + +impl TryFrom for FeedbackKind { + type Error = (); + fn try_from(value: Feedback) -> Result { + match value.kind { + 0 => Ok(FeedbackKind::Bitrate { min: value.min, max: value.max }), + 1 => Ok(FeedbackKind::KeyFrameRequest), + _ => Err(()), + } + } +} +#[derive(Debug, PartialEq, Eq)] pub enum Output { Endpoint(Vec, ClusterEndpointEvent), Pubsub(pubsub::Control), @@ -42,17 +57,20 @@ impl RoomChannelPublisher { } pub fn on_channel_feedback(&mut self, channel: ChannelId, fb: Feedback) -> Option> { - let fb_kind = FeedbackKind::try_from(fb.kind).ok()?; + let fb = FeedbackKind::try_from(fb).ok()?; let (owner, track_id) = self.tracks_source.get(&channel)?; - match fb_kind { - FeedbackKind::Bitrate => todo!(), + match fb { + FeedbackKind::Bitrate { min, max } => Some(Output::Endpoint( + vec![*owner], + ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::LimitBitrate { min, max }), + )), FeedbackKind::KeyFrameRequest => Some(Output::Endpoint(vec![*owner], ClusterEndpointEvent::RemoteTrack(*track_id, ClusterRemoteTrackEvent::RequestKeyFrame))), } } pub fn on_track_publish(&mut self, owner: Owner, track: RemoteTrackId, peer: PeerId, name: TrackName) -> Option> { log::info!("[ClusterRoom {}] peer ({peer} started track {name})", self.room); - let channel_id = super::gen_channel_id(self.room, &peer, &name); + let channel_id = id_generator::gen_channel_id(self.room, &peer, &name); self.tracks.insert((owner, track), (peer.clone(), name.clone(), channel_id)); self.tracks_source.insert(channel_id, (owner, track)); @@ -80,9 +98,84 @@ impl RoomChannelPublisher { #[cfg(test)] mod tests { - //TODO Track start => should register with SDN - //TODO Track stop => should unregister with SDN - //TODO Track media => should send data over SDN + use atm0s_sdn::features::pubsub::{ChannelControl, Control, Feedback}; + use media_server_protocol::media::MediaPacket; + + use crate::{ + cluster::{ClusterEndpointEvent, ClusterRemoteTrackEvent}, + transport::RemoteTrackId, + }; + + use super::id_generator::gen_channel_id; + use super::{Output, RoomChannelPublisher}; + + pub fn fake_audio() -> MediaPacket { + MediaPacket { + pt: 111, + ts: 0, + seq: 0, + marker: true, + nackable: false, + data: vec![1, 2, 3, 4], + } + } + + //Track start => should register with SDN + //Track stop => should unregister with SDN + //Track media => should send data over SDN + #[test] + fn channel_publish_data() { + let room = 1.into(); + let mut publisher = RoomChannelPublisher::::new(room); + + let owner = 2; + let track = RemoteTrackId(3); + let peer = "peer1".to_string().into(); + let name = "audio_main".to_string().into(); + let channel_id = gen_channel_id(room, &peer, &name); + let out = publisher.on_track_publish(owner, track, peer, name); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(publisher.pop_output(), None); + + let media = fake_audio(); + let out = publisher.on_track_data(owner, track, media.clone()); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::PubData(media.serialize()))))); + assert_eq!(publisher.pop_output(), None); + + let out = publisher.on_track_unpublish(owner, track); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::PubStop)))); + assert_eq!(publisher.pop_output(), None); + } + //TODO Handle feedback: should handle KeyFrame feedback //TODO Handle feedback: should handle Bitrate feedback + #[test] + fn channel_feedback() { + let room = 1.into(); + let mut publisher = RoomChannelPublisher::::new(room); + + let owner = 2; + let track = RemoteTrackId(3); + let peer = "peer1".to_string().into(); + let name = "audio_main".to_string().into(); + let channel_id = gen_channel_id(room, &peer, &name); + let out = publisher.on_track_publish(owner, track, peer, name); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::PubStart)))); + assert_eq!(publisher.pop_output(), None); + + let out = publisher.on_channel_feedback(channel_id, Feedback::simple(0, 1000, 100, 200)); + assert_eq!( + out, + Some(Output::Endpoint( + vec![owner], + ClusterEndpointEvent::RemoteTrack(track, ClusterRemoteTrackEvent::LimitBitrate { min: 1000, max: 1000 }) + )) + ); + + let out = publisher.on_channel_feedback(channel_id, Feedback::simple(1, 1, 100, 200)); + assert_eq!( + out, + Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::RemoteTrack(track, ClusterRemoteTrackEvent::RequestKeyFrame))) + ); + } } diff --git a/packages/media_core/src/cluster/room/channel_sub.rs b/packages/media_core/src/cluster/room/channel_sub.rs index 6716a172..16dc45f7 100644 --- a/packages/media_core/src/cluster/room/channel_sub.rs +++ b/packages/media_core/src/cluster/room/channel_sub.rs @@ -12,25 +12,43 @@ use atm0s_sdn::{ features::pubsub::{self, ChannelControl, ChannelId, Feedback}, NodeId, }; +use derivative::Derivative; use media_server_protocol::{ endpoint::{PeerId, TrackName}, media::MediaPacket, }; use crate::{ - cluster::{room::FeedbackKind, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRoomHash}, + cluster::{id_generator, ClusterEndpointEvent, ClusterLocalTrackEvent, ClusterRoomHash}, transport::LocalTrackId, }; +const BITRATE_FEEDBACK_INTERVAL: u16 = 100; //100 ms +const BITRATE_FEEDBACK_TIMEOUT: u16 = 2000; //2 seconds + +const KEYFRAME_FEEDBACK_INTERVAL: u16 = 1000; //100 ms +const KEYFRAME_FEEDBACK_TIMEOUT: u16 = 2000; //2 seconds + +const BITRATE_FEEDBACK_KIND: u8 = 0; +const KEYFRAME_FEEDBACK_KIND: u8 = 1; + +#[derive(Debug, PartialEq, Eq)] pub enum Output { Endpoint(Vec, ClusterEndpointEvent), Pubsub(pubsub::Control), } +#[derive(Derivative)] +#[derivative(Default(bound = ""))] +struct ChannelContainer { + owners: Vec<(Owner, LocalTrackId)>, + bitrate_fbs: HashMap, +} + pub struct RoomChannelSubscribe { room: ClusterRoomHash, - subscribers: HashMap>, - subscribers_source: HashMap<(Owner, LocalTrackId), (ChannelId, PeerId, TrackName)>, + channels: HashMap>, + subscribers: HashMap<(Owner, LocalTrackId), (ChannelId, PeerId, TrackName)>, queue: VecDeque>, } @@ -38,16 +56,16 @@ impl RoomChannelSubscribe { pub fn new(room: ClusterRoomHash) -> Self { Self { room, + channels: HashMap::new(), subscribers: HashMap::new(), - subscribers_source: HashMap::new(), queue: VecDeque::new(), } } pub fn on_channel_relay_changed(&mut self, channel: ChannelId, _relay: NodeId) -> Option> { - let subscribers = self.subscribers.get(&channel)?; - log::info!("[ClusterRoom {}] cluster: channel {channel} source changed => fire event to {:?}", self.room, subscribers); - for (owner, track) in subscribers { + let channel_container = self.channels.get(&channel)?; + log::info!("[ClusterRoom {}] cluster: channel {channel} source changed => fire event to {:?}", self.room, channel_container.owners); + for (owner, track) in &channel_container.owners { self.queue .push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::SourceChanged))) } @@ -56,9 +74,15 @@ impl RoomChannelSubscribe { pub fn on_channel_data(&mut self, channel: ChannelId, data: Vec) -> Option> { let pkt = MediaPacket::deserialize(&data)?; - let subscribers = self.subscribers.get(&channel)?; - log::trace!("[ClusterRoom {}] on channel media payload {} seq {} to {} subscribers", self.room, pkt.pt, pkt.seq, subscribers.len()); - for (owner, track) in subscribers { + let channel_container = self.channels.get(&channel)?; + log::trace!( + "[ClusterRoom {}] on channel media payload {} seq {} to {} subscribers", + self.room, + pkt.pt, + pkt.seq, + channel_container.owners.len() + ); + for (owner, track) in &channel_container.owners { self.queue .push_back(Output::Endpoint(vec![*owner], ClusterEndpointEvent::LocalTrack(*track, ClusterLocalTrackEvent::Media(pkt.clone())))) } @@ -66,16 +90,16 @@ impl RoomChannelSubscribe { } pub fn on_track_subscribe(&mut self, owner: Owner, track: LocalTrackId, target_peer: PeerId, target_track: TrackName) -> Option> { - let channel_id: ChannelId = super::gen_channel_id(self.room, &target_peer, &target_track); + let channel_id: ChannelId = id_generator::gen_channel_id(self.room, &target_peer, &target_track); log::info!( "[ClusterRoom {}] owner {:?} track {track} subscribe peer {target_peer} track {target_track}), channel: {channel_id}", self.room, owner ); - self.subscribers_source.insert((owner, track), (channel_id, target_peer, target_track)); - let subscribers = self.subscribers.entry(channel_id).or_insert(Default::default()); - subscribers.push((owner, track)); - if subscribers.len() == 1 { + self.subscribers.insert((owner, track), (channel_id, target_peer, target_track)); + let channel_container = self.channels.entry(channel_id).or_insert(Default::default()); + channel_container.owners.push((owner, track)); + if channel_container.owners.len() == 1 { log::info!("[ClusterRoom {}] first subscriber => Sub channel {channel_id}", self.room); Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::SubAuto))) } else { @@ -84,47 +108,217 @@ impl RoomChannelSubscribe { } pub fn on_track_request_key(&mut self, owner: Owner, track: LocalTrackId) -> Option> { - let (channel_id, peer, track) = self.subscribers_source.get(&(owner, track))?; + let (channel_id, peer, track) = self.subscribers.get(&(owner, track))?; log::info!("[ClusterRoom {}] request key-frame {channel_id} {peer} {track}", self.room); Some(Output::Pubsub(pubsub::Control( *channel_id, - ChannelControl::FeedbackAuto(Feedback::simple(FeedbackKind::KeyFrameRequest as u8, 1, 100, 200)), + ChannelControl::FeedbackAuto(Feedback::simple(KEYFRAME_FEEDBACK_KIND, 1, KEYFRAME_FEEDBACK_INTERVAL, KEYFRAME_FEEDBACK_TIMEOUT)), ))) } - pub fn on_track_desired_bitrate(&mut self, owner: Owner, track: LocalTrackId, bitrate: u32) -> Option> { - todo!() + pub fn on_track_desired_bitrate(&mut self, now: Instant, owner: Owner, track: LocalTrackId, bitrate: u64) -> Option> { + let (channel_id, _peer, _track) = self.subscribers.get(&(owner, track))?; + let channel_container = self.channels.get_mut(channel_id)?; + let fb = Feedback::simple(BITRATE_FEEDBACK_KIND, bitrate, BITRATE_FEEDBACK_INTERVAL, BITRATE_FEEDBACK_TIMEOUT); + channel_container.bitrate_fbs.insert(owner, (now, fb)); + + //clean if if timeout + channel_container + .bitrate_fbs + .retain(|_, (ts, _)| now.duration_since(*ts).as_millis() < BITRATE_FEEDBACK_TIMEOUT as u128); + + //sum all fbs + let mut sum_fb = None; + for (_, fb) in channel_container.bitrate_fbs.values() { + if let Some(sum_fb) = &mut sum_fb { + *sum_fb = *sum_fb + *fb; + } else { + sum_fb = Some(fb.clone()); + } + } + Some(Output::Pubsub(pubsub::Control(*channel_id, ChannelControl::FeedbackAuto(sum_fb?)))) } pub fn on_track_unsubscribe(&mut self, owner: Owner, track: LocalTrackId) -> Option> { - let (channel_id, target_peer, target_track) = self.subscribers_source.get(&(owner, track))?; + let (channel_id, target_peer, target_track) = self.subscribers.remove(&(owner, track))?; log::info!( "[ClusterRoom {}] owner {:?} track {track} unsubscribe from source {target_peer} {target_track}, channel {channel_id}", self.room, owner ); - let subscribers = self.subscribers.get_mut(channel_id)?; - let (index, _) = subscribers.iter().enumerate().find(|e| e.1.eq(&(owner, track)))?; - subscribers.swap_remove(index); + let channel_container = self.channels.get_mut(&channel_id)?; + let (index, _) = channel_container.owners.iter().enumerate().find(|e| e.1.eq(&(owner, track)))?; + channel_container.owners.swap_remove(index); - if subscribers.is_empty() { + if channel_container.owners.is_empty() { + self.channels.remove(&channel_id); log::info!("[ClusterRoom {}] last unsubscriber => Unsub channel {channel_id}", self.room); - Some(Output::Pubsub(pubsub::Control(*channel_id, ChannelControl::UnsubAuto))) + Some(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))) } else { None } } - pub fn pop_output(&mut self, now: Instant) -> Option> { + pub fn pop_output(&mut self, _now: Instant) -> Option> { self.queue.pop_front() } } #[cfg(test)] mod tests { + use std::time::{Duration, Instant}; + + use atm0s_sdn::features::pubsub::{ChannelControl, Control, Feedback}; + use media_server_protocol::{ + endpoint::{PeerId, TrackName}, + media::MediaPacket, + }; + + use crate::{ + cluster::{ + room::channel_sub::{BITRATE_FEEDBACK_INTERVAL, BITRATE_FEEDBACK_KIND, BITRATE_FEEDBACK_TIMEOUT, KEYFRAME_FEEDBACK_INTERVAL, KEYFRAME_FEEDBACK_KIND, KEYFRAME_FEEDBACK_TIMEOUT}, + ClusterEndpointEvent, ClusterLocalTrackEvent, + }, + transport::LocalTrackId, + }; + + use super::id_generator::gen_channel_id; + use super::{Output, RoomChannelSubscribe}; + + pub fn fake_audio() -> MediaPacket { + MediaPacket { + pt: 111, + ts: 0, + seq: 0, + marker: true, + nackable: false, + data: vec![1, 2, 3, 4], + } + } + //TODO First Subscribe channel should sending Sub //TODO Last Unsubscribe channel should sending Unsub + #[test] + fn normal_sub_ubsub() { + let room = 1.into(); + let mut subscriber = RoomChannelSubscribe::::new(room); + + let owner = 2; + let track = LocalTrackId(3); + let target_peer: PeerId = "peer2".to_string().into(); + let target_track: TrackName = "audio_main".to_string().into(); + let channel_id = gen_channel_id(room, &target_peer, &target_track); + let out = subscriber.on_track_subscribe(owner, track, target_peer.clone(), target_track.clone()); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(Instant::now()), None); + + let pkt = fake_audio(); + let out = subscriber.on_channel_data(channel_id, pkt.serialize()); + assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::LocalTrack(track, ClusterLocalTrackEvent::Media(pkt))))); + assert_eq!(subscriber.pop_output(Instant::now()), None); + + let out = subscriber.on_track_unsubscribe(owner, track); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::UnsubAuto)))); + assert_eq!(subscriber.pop_output(Instant::now()), None); + } + //TODO Sending key-frame request + #[test] + fn send_key_frame() { + let room = 1.into(); + let mut subscriber = RoomChannelSubscribe::::new(room); + + let owner = 2; + let track = LocalTrackId(3); + let target_peer: PeerId = "peer2".to_string().into(); + let target_track: TrackName = "audio_main".to_string().into(); + let channel_id = gen_channel_id(room, &target_peer, &target_track); + let out = subscriber.on_track_subscribe(owner, track, target_peer.clone(), target_track.clone()); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(Instant::now()), None); + + let out = subscriber.on_track_request_key(owner, track); + assert_eq!( + out, + Some(Output::Pubsub(Control( + channel_id, + ChannelControl::FeedbackAuto(Feedback::simple(KEYFRAME_FEEDBACK_KIND, 1, KEYFRAME_FEEDBACK_INTERVAL, KEYFRAME_FEEDBACK_TIMEOUT)) + ))) + ); + assert_eq!(subscriber.pop_output(Instant::now()), None); + } + //TODO Sending bitrate request single sub - //TODO Sending bitrate request multi subs + #[test] + fn send_bitrate_limit_speed() { + let room = 1.into(); + let mut subscriber = RoomChannelSubscribe::::new(room); + + let owner1 = 2; + let track1 = LocalTrackId(3); + let target_peer: PeerId = "peer2".to_string().into(); + let target_track: TrackName = "audio_main".to_string().into(); + let channel_id = gen_channel_id(room, &target_peer, &target_track); + let out = subscriber.on_track_subscribe(owner1, track1, target_peer.clone(), target_track.clone()); + assert_eq!(out, Some(Output::Pubsub(Control(channel_id, ChannelControl::SubAuto)))); + assert_eq!(subscriber.pop_output(Instant::now()), None); + + let mut now = Instant::now(); + + let out = subscriber.on_track_desired_bitrate(now, owner1, track1, 1000); + assert_eq!( + out, + Some(Output::Pubsub(Control( + channel_id, + ChannelControl::FeedbackAuto(Feedback::simple(BITRATE_FEEDBACK_KIND, 1000, BITRATE_FEEDBACK_INTERVAL, BITRATE_FEEDBACK_TIMEOUT)) + ))) + ); + assert_eq!(subscriber.pop_output(now), None); + + // more local track sub that channel + let owner2 = 3; + let track2 = LocalTrackId(4); + let out = subscriber.on_track_subscribe(owner2, track2, target_peer.clone(), target_track.clone()); + assert_eq!(out, None); + + // more feedback from local track2 + now += Duration::from_millis(100); + let out = subscriber.on_track_desired_bitrate(now, owner2, track2, 2000); + assert_eq!( + out, + Some(Output::Pubsub(Control( + channel_id, + ChannelControl::FeedbackAuto(Feedback { + kind: BITRATE_FEEDBACK_KIND, + count: 2, + max: 2000, + min: 1000, + sum: 3000, + interval_ms: BITRATE_FEEDBACK_INTERVAL, + timeout_ms: BITRATE_FEEDBACK_TIMEOUT + }) + ))) + ); + assert_eq!(subscriber.pop_output(now), None); + + //now last update from track2 after long time cause track1 feedback will be timeout + now += Duration::from_millis(BITRATE_FEEDBACK_TIMEOUT as u64 - 100); + let out = subscriber.on_track_desired_bitrate(now, owner2, track2, 3000); + assert_eq!( + out, + Some(Output::Pubsub(Control( + channel_id, + ChannelControl::FeedbackAuto(Feedback { + kind: BITRATE_FEEDBACK_KIND, + count: 1, + max: 3000, + min: 3000, + sum: 3000, + interval_ms: BITRATE_FEEDBACK_INTERVAL, + timeout_ms: BITRATE_FEEDBACK_TIMEOUT + }) + ))) + ); + assert_eq!(subscriber.pop_output(now), None); + } } diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index 74a0dcdc..dad35634 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -19,7 +19,7 @@ use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublis use smallmap::{Map as SmallMap, Set as SmallSet}; use crate::{ - cluster::{ClusterEndpointEvent, ClusterRoomHash}, + cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash}, transport::RemoteTrackId, }; @@ -54,8 +54,8 @@ impl RoomMetadata { pub fn new(room: ClusterRoomHash) -> Self { Self { room, - peers_map: Self::peers_map(room), - tracks_map: Self::tracks_map(room), + peers_map: id_generator::peers_map(room), + tracks_map: id_generator::tracks_map(room), peers: SmallMap::new(), peers_map_subscribers: SmallMap::new(), tracks_map_subscribers: SmallMap::new(), @@ -66,32 +66,8 @@ impl RoomMetadata { } } - fn peer_map(room: ClusterRoomHash, peer: &PeerId) -> Map { - let mut h = DefaultHasher::new(); - room.as_ref().hash(&mut h); - peer.as_ref().hash(&mut h); - h.finish().into() - } - - fn peers_map(room: ClusterRoomHash) -> Map { - room.0.into() - } - - fn peers_key(peer: &PeerId) -> Key { - let mut h = DefaultHasher::new(); - peer.as_ref().hash(&mut h); - h.finish().into() - } - - fn tracks_map(room: ClusterRoomHash) -> Map { - (room.0 + 1).into() - } - - fn tracks_key(peer: &PeerId, track: &TrackName) -> Key { - let mut h = DefaultHasher::new(); - peer.as_ref().hash(&mut h); - track.as_ref().hash(&mut h); - h.finish().into() + pub fn peers(&self) -> usize { + self.peers.len() } pub fn get_peer_from_owner(&self, owner: Owner) -> Option { @@ -111,7 +87,7 @@ impl RoomMetadata { pub_tracks: SmallMap::new(), }, ); - let peer_key = Self::peers_key(&peer); + let peer_key = id_generator::peers_key(&peer); // Let Set to peers_map if need need publisj.peer if publish.peer { @@ -160,18 +136,18 @@ impl RoomMetadata { } pub fn on_leave(&mut self, owner: Owner) -> Option> { - let peer = self.peers.remove(&owner).expect("Should have owner"); + let peer = self.peers.remove(&owner)?; log::info!("[ClusterRoom {}] leave peer {}", self.room, peer.peer); - let peer_key = Self::peers_key(&peer.peer); + let peer_key = id_generator::peers_key(&peer.peer); // If remain remote tracks, must to delete from list. if peer.publish.peer { self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Del(peer_key)))) } // If remain remote tracks, must to delete from list. - let peer_map = Self::peer_map(self.room, &peer.peer); + let peer_map = id_generator::peer_map(self.room, &peer.peer); for (_, track) in peer.pub_tracks.into_iter() { - let track_key = Self::tracks_key(&peer.peer, &track); + let track_key = id_generator::tracks_key(&peer.peer, &track); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Del(track_key)))); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); } @@ -192,7 +168,7 @@ impl RoomMetadata { // check if this peer manual subscribe to some private peer map => need send Unsub for (target, _) in peer.sub_peers.into_iter() { - let target_peer_map = Self::peer_map(self.room, &target); + let target_peer_map = id_generator::peer_map(self.room, &target); let subs = self.peers_tracks_subs.get_mut(&target_peer_map).expect("Should have private peer_map"); subs.remove(&owner); if subs.is_empty() { @@ -206,7 +182,7 @@ impl RoomMetadata { pub fn on_subscribe_peer(&mut self, owner: Owner, target: PeerId) -> Option> { let peer = self.peers.get_mut(&owner).expect("Should have peer"); - let target_peer_map = Self::peer_map(self.room, &target); + let target_peer_map = id_generator::peer_map(self.room, &target); let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); let need_sub = subs.is_empty(); subs.insert(owner, ()); @@ -221,7 +197,7 @@ impl RoomMetadata { pub fn on_unsubscribe_peer(&mut self, owner: Owner, target: PeerId) -> Option> { let peer = self.peers.get_mut(&owner).expect("Should have peer"); - let target_peer_map = Self::peer_map(self.room, &target); + let target_peer_map = id_generator::peer_map(self.room, &target); let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); subs.remove(&owner); peer.sub_peers.remove(&target); @@ -241,10 +217,10 @@ impl RoomMetadata { track: track.clone(), meta, }; - let track_key = Self::tracks_key(&peer.peer, &track); + let track_key = id_generator::tracks_key(&peer.peer, &track); peer.pub_tracks.insert(track_id, track); - let peer_map = Self::peer_map(self.room, &peer.peer); + let peer_map = id_generator::peer_map(self.room, &peer.peer); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Set(track_key, info.serialize())))); Some(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Set(track_key, info.serialize())))) @@ -256,9 +232,9 @@ impl RoomMetadata { pub fn on_track_unpublish(&mut self, owner: Owner, track_id: RemoteTrackId) -> Option> { let peer = self.peers.get_mut(&owner)?; let track = peer.pub_tracks.remove(&track_id)?; - let track_key = Self::tracks_key(&peer.peer, &track); + let track_key = id_generator::tracks_key(&peer.peer, &track); - let peer_map = Self::peer_map(self.room, &peer.peer); + let peer_map = id_generator::peer_map(self.room, &peer.peer); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); Some(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Del(track_key)))) @@ -398,7 +374,7 @@ mod tests { use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, TrackInfo, TrackName}; use crate::{ - cluster::{ClusterEndpointEvent, ClusterRoomHash}, + cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash}, transport::RemoteTrackId, }; @@ -429,13 +405,13 @@ mod tests { #[test] fn join_peer_only() { let room: ClusterRoomHash = 1.into(); - let peers_map = RoomMetadata::::peers_map(room); - let tracks_map = RoomMetadata::::tracks_map(room); + let peers_map = id_generator::peers_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); let peer_meta = PeerMeta {}; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); - let peer_key = RoomMetadata::::peers_key(&peer_id); + let peer_key = id_generator::peers_key(&peer_id); let owner = 1; let out = room_meta.on_join( owner, @@ -455,8 +431,8 @@ mod tests { let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); - let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); + let track_key = id_generator::tracks_key(&peer_id, &track_name); + let out: Option> = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); assert_eq!(out, None); // should only handle remove peer event, reject track @@ -477,11 +453,11 @@ mod tests { #[test] fn join_sub_peer_only_should_restore_old_peers() { let room: ClusterRoomHash = 1.into(); - let peers_map = RoomMetadata::::peers_map(room); + let peers_map = id_generator::peers_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer2: PeerId = "peer2".to_string().into(); - let peer2_key = RoomMetadata::::peers_key(&peer2); + let peer2_key = id_generator::peers_key(&peer2); let peer2_info = PeerInfo::new(peer2, PeerMeta {}); let out = room_meta.on_kv_event(peers_map, MapEvent::OnSet(peer2_key, 0, peer2_info.serialize())); @@ -509,13 +485,13 @@ mod tests { #[test] fn join_track_only() { let room: ClusterRoomHash = 1.into(); - let peers_map = RoomMetadata::::peers_map(room); - let tracks_map = RoomMetadata::::tracks_map(room); + let peers_map = id_generator::peers_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); let peer_meta = PeerMeta {}; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); - let peer_key = RoomMetadata::::peers_key(&peer_id); + let peer_key = id_generator::peers_key(&peer_id); let owner = 1; let out = room_meta.on_join( owner, @@ -533,7 +509,7 @@ mod tests { let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let track_key = id_generator::tracks_key(&peer_id, &track_name); let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); assert_eq!( out, @@ -562,12 +538,12 @@ mod tests { #[test] fn join_sub_track_only_should_restore_old_tracks() { let room: ClusterRoomHash = 1.into(); - let tracks_map = RoomMetadata::::tracks_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer2: PeerId = "peer2".to_string().into(); let track_name: TrackName = "audio_main".to_string().into(); - let track_key = RoomMetadata::::tracks_key(&peer2, &track_name); + let track_key = id_generator::tracks_key(&peer2, &track_name); let track_info = TrackInfo::simple_audio(peer2); let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); @@ -598,13 +574,13 @@ mod tests { #[test] fn join_manual_no_subscribe_peer() { let room: ClusterRoomHash = 1.into(); - let peers_map = RoomMetadata::::peers_map(room); - let tracks_map = RoomMetadata::::tracks_map(room); + let peers_map = id_generator::peers_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let peer_id: PeerId = "peer1".to_string().into(); let peer_meta = PeerMeta {}; let peer_info = PeerInfo::new(peer_id.clone(), peer_meta.clone()); - let peer_key = RoomMetadata::::peers_key(&peer_id); + let peer_key = id_generator::peers_key(&peer_id); let owner = 1; let out = room_meta.on_join( owner, @@ -621,7 +597,7 @@ mod tests { let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let track_key = id_generator::tracks_key(&peer_id, &track_name); let out = room_meta.on_kv_event(tracks_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); assert_eq!(out, None); @@ -655,7 +631,7 @@ mod tests { assert_eq!(out, None); let peer2: PeerId = "peer1".to_string().into(); - let peer2_map = RoomMetadata::::peer_map(room, &peer2); + let peer2_map = id_generator::peer_map(room, &peer2); let out = room_meta.on_subscribe_peer(owner, peer2.clone()); assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Sub)))); assert_eq!(room_meta.pop_output(Instant::now()), None); @@ -663,7 +639,7 @@ mod tests { // should handle incoming event with only track and reject peer let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let track_key = RoomMetadata::::tracks_key(&peer2, &track_name); + let track_key = id_generator::tracks_key(&peer2, &track_name); let out = room_meta.on_kv_event(peer2_map, MapEvent::OnSet(track_key, 0, track_info.serialize())); assert_eq!( out, @@ -693,7 +669,7 @@ mod tests { #[test] fn track_publish_enable() { let room: ClusterRoomHash = 1.into(); - let tracks_map = RoomMetadata::::tracks_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let owner = 1; @@ -711,8 +687,8 @@ mod tests { let track_id: RemoteTrackId = RemoteTrackId(1); let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let peer_map = RoomMetadata::::peer_map(room, &peer_id); - let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let peer_map = id_generator::peer_map(room, &peer_id); + let track_key = id_generator::tracks_key(&peer_id, &track_name); let out = room_meta.on_track_publish(owner, track_id, track_name, track_info.meta.clone()); assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Set(track_key, track_info.serialize()))))); assert_eq!( @@ -769,7 +745,7 @@ mod tests { #[test] fn leave_room_auto_del_remote_tracks() { let room: ClusterRoomHash = 1.into(); - let tracks_map = RoomMetadata::::tracks_map(room); + let tracks_map = id_generator::tracks_map(room); let mut room_meta: RoomMetadata = RoomMetadata::::new(room); let owner = 1; @@ -787,8 +763,8 @@ mod tests { let track_id: RemoteTrackId = RemoteTrackId(1); let track_name: TrackName = "audio_main".to_string().into(); let track_info = TrackInfo::simple_audio(peer_id.clone()); - let peer_map = RoomMetadata::::peer_map(room, &peer_id); - let track_key = RoomMetadata::::tracks_key(&peer_id, &track_name); + let peer_map = id_generator::peer_map(room, &peer_id); + let track_key = id_generator::tracks_key(&peer_id, &track_name); let out = room_meta.on_track_publish(owner, track_id, track_name, track_info.meta.clone()); assert_eq!(out, Some(Output::Kv(Control::MapCmd(tracks_map, MapControl::Set(track_key, track_info.serialize()))))); assert_eq!( @@ -822,7 +798,7 @@ mod tests { assert_eq!(out, None); let peer2: PeerId = "peer1".to_string().into(); - let peer2_map = RoomMetadata::::peer_map(room, &peer2); + let peer2_map = id_generator::peer_map(room, &peer2); let out = room_meta.on_subscribe_peer(owner, peer2.clone()); assert_eq!(out, Some(Output::Kv(Control::MapCmd(peer2_map, MapControl::Sub)))); assert_eq!(room_meta.pop_output(Instant::now()), None);