Skip to content

Commit

Permalink
feat: channel pub-sub feature and tests. cluster integration test (#262)
Browse files Browse the repository at this point in the history
* feat: channel pub-sub feature and tests. cluster integration test

* chore: fix typos
  • Loading branch information
giangndm authored Apr 24, 2024
1 parent fd703ca commit 829ea8b
Show file tree
Hide file tree
Showing 7 changed files with 539 additions and 152 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ For a deep dive into the technical aspects of network architecture, please refer

## Project Status: Refactoring

We are actively refactoring entiry media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for older version, please check in [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).
We are actively refactoring entire media server and network stack with [sans-io-runtime](https://github.com/8xff/sans-io-runtime) for better performance. If you are looking for an older version, please check out the [legacy branch](https://github.com/8xFF/atm0s-media-server/tree/legacy).

## Features

Expand Down
119 changes: 103 additions & 16 deletions packages/media_core/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//!

use derive_more::{AsRef, Display, From};
use sans_io_runtime::{Task, TaskGroup};
use sans_io_runtime::TaskGroup;
use std::{
collections::HashMap,
fmt::Debug,
Expand All @@ -23,6 +23,7 @@ use crate::transport::{LocalTrackId, RemoteTrackId};

use self::room::ClusterRoom;

mod id_generator;
mod room;

#[derive(Clone, Copy, From, AsRef, PartialEq, Eq, Debug, Display, Hash)]
Expand All @@ -46,14 +47,14 @@ pub enum ClusterRemoteTrackControl {
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClusterRemoteTrackEvent {
RequestKeyFrame,
LimitBitrate { min: u32, max: u32 },
LimitBitrate { min: u64, max: u64 },
}

#[derive(Debug, Clone)]
pub enum ClusterLocalTrackControl {
Subscribe(PeerId, TrackName),
RequestKeyFrame,
DesiredBitrate(u32),
DesiredBitrate(u64),
Unsubscribe,
}

Expand Down Expand Up @@ -90,6 +91,7 @@ pub enum Input<Owner> {
Endpoint(Owner, ClusterRoomHash, ClusterEndpointControl),
}

#[derive(Debug, PartialEq, Eq)]
pub enum Output<Owner> {
Sdn(ClusterRoomHash, FeaturesControl),
Endpoint(Vec<Owner>, ClusterEndpointEvent),
Expand All @@ -98,7 +100,7 @@ pub enum Output<Owner> {

pub struct MediaCluster<Owner: Debug + Copy + Clone + Hash + Eq> {
rooms_map: HashMap<ClusterRoomHash, usize>,
rooms: TaskGroup<room::Input<Owner>, Output<Owner>, ClusterRoom<Owner>, 128>,
rooms: TaskGroup<room::Input<Owner>, room::Output<Owner>, ClusterRoom<Owner>, 128>,
}

impl<Owner: Debug + Copy + Hash + Eq + Clone> Default for MediaCluster<Owner> {
Expand All @@ -112,41 +114,126 @@ impl<Owner: Debug + Copy + Hash + Eq + Clone> Default for MediaCluster<Owner> {

impl<Owner: Debug + Hash + Copy + Clone + Debug + Eq> MediaCluster<Owner> {
pub fn on_tick(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.on_tick(now)?;
Some(out)
let (index, out) = self.rooms.on_tick(now)?;
Some(self.process_room_output(index, out))
}

pub fn on_sdn_event(&mut self, now: Instant, room: ClusterRoomHash, event: FeaturesEvent) -> Option<Output<Owner>> {
let index = self.rooms_map.get(&room)?;
self.rooms.on_event(now, *index, room::Input::Sdn(event))
let out = self.rooms.on_event(now, *index, room::Input::Sdn(event))?;
Some(self.process_room_output(*index, out))
}

pub fn on_endpoint_control(&mut self, now: Instant, owner: Owner, room_hash: ClusterRoomHash, control: ClusterEndpointControl) -> Option<Output<Owner>> {
if let Some(index) = self.rooms_map.get(&room_hash) {
self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))
let out = self.rooms.on_event(now, *index, room::Input::Endpoint(owner, control))?;
Some(self.process_room_output(*index, out))
} else {
log::info!("[MediaCluster] create room {}", room_hash);
let mut room = ClusterRoom::new(room_hash);
let out = room.on_event(now, room::Input::Endpoint(owner, control));
let index = self.rooms.add_task(room);
let index = self.rooms.add_task(ClusterRoom::new(room_hash));
self.rooms_map.insert(room_hash, index);
out
let out = self.rooms.on_event(now, index, room::Input::Endpoint(owner, control))?;
Some(self.process_room_output(index, out))
}
}

pub fn pop_output(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.pop_output(now)?;
Some(out)
let (index, out) = self.rooms.pop_output(now)?;
Some(self.process_room_output(index, out))
}

pub fn shutdown<'a>(&mut self, now: Instant) -> Option<Output<Owner>> {
let (_index, out) = self.rooms.shutdown(now)?;
Some(out)
let (index, out) = self.rooms.shutdown(now)?;
Some(self.process_room_output(index, out))
}

fn process_room_output(&mut self, index: usize, out: room::Output<Owner>) -> Output<Owner> {
match out {
room::Output::Sdn(userdata, control) => Output::Sdn(userdata, control),
room::Output::Endpoint(owners, event) => Output::Endpoint(owners, event),
room::Output::Destroy => {
self.rooms.remove_task(index);
Output::Continue
}
}
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;

use atm0s_sdn::features::{
dht_kv::{self, MapControl, MapEvent},
FeaturesControl, FeaturesEvent,
};
use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe};

use crate::cluster::{id_generator, ClusterEndpointEvent};

use super::{ClusterEndpointControl, ClusterRoomHash, MediaCluster, Output};

//TODO should create room when new room event arrived
//TODO should route to correct room
//TODO should remove room after all peers leaved
#[test]
fn room_manager_should_work() {
let mut cluster = MediaCluster::<u8>::default();

let owner = 1;
let room_hash = ClusterRoomHash(1);
let room_peers_map = id_generator::peers_map(room_hash);
let peer = PeerId("peer1".to_string());
let peer_key = id_generator::peers_key(&peer);
let peer_info = PeerInfo::new(peer.clone(), PeerMeta {});

// Not join room with scope (peer true, track false) should Set and Sub
let out = cluster.on_endpoint_control(
Instant::now(),
owner,
room_hash,
ClusterEndpointControl::Join(
peer.clone(),
peer_info.meta.clone(),
RoomInfoPublish { peer: true, tracks: false },
RoomInfoSubscribe { peers: true, tracks: false },
),
);
assert_eq!(
out,
Some(Output::Sdn(
room_hash,
FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Set(peer_key, peer_info.serialize())))
))
);
assert_eq!(
cluster.pop_output(Instant::now()),
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Sub))))
);
assert_eq!(cluster.pop_output(Instant::now()), None);
assert_eq!(cluster.rooms.tasks(), 1);

// Correct forward to room
let out = cluster.on_sdn_event(
Instant::now(),
room_hash,
FeaturesEvent::DhtKv(dht_kv::Event::MapEvent(room_peers_map, MapEvent::OnSet(peer_key, 1, peer_info.serialize()))),
);
assert_eq!(out, Some(Output::Endpoint(vec![owner], ClusterEndpointEvent::PeerJoined(peer.clone(), peer_info.meta.clone()))));
assert_eq!(cluster.pop_output(Instant::now()), None);

// Now leave room should Del and Unsub
let out = cluster.on_endpoint_control(Instant::now(), owner, room_hash, ClusterEndpointControl::Leave);
assert_eq!(
out,
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Del(peer_key)))))
);
assert_eq!(
cluster.pop_output(Instant::now()),
Some(Output::Sdn(room_hash, FeaturesControl::DhtKv(dht_kv::Control::MapCmd(room_peers_map, MapControl::Unsub))))
);
assert_eq!(cluster.pop_output(Instant::now()), Some(Output::Continue)); //this is for destroy event
assert_eq!(cluster.pop_output(Instant::now()), None);
assert_eq!(cluster.rooms.tasks(), 0);
}
}
42 changes: 42 additions & 0 deletions packages/media_core/src/cluster/id_generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use atm0s_sdn::features::dht_kv::{Key, Map};
use media_server_protocol::endpoint::{PeerId, TrackName};

use super::ClusterRoomHash;

pub fn peer_map(room: ClusterRoomHash, peer: &PeerId) -> Map {
let mut h = DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
h.finish().into()
}

pub fn peers_map(room: ClusterRoomHash) -> Map {
room.0.into()
}

pub fn peers_key(peer: &PeerId) -> Key {
let mut h = DefaultHasher::new();
peer.as_ref().hash(&mut h);
h.finish().into()
}

pub fn tracks_map(room: ClusterRoomHash) -> Map {
(room.0 + 1).into()
}

pub fn tracks_key(peer: &PeerId, track: &TrackName) -> Key {
let mut h = DefaultHasher::new();
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}

pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}
55 changes: 25 additions & 30 deletions packages/media_core/src/cluster/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,32 @@
//! - Send/Receive pubsub channel
//!

use std::{
collections::VecDeque,
fmt::Debug,
hash::{Hash, Hasher},
time::Instant,
};
use std::{collections::VecDeque, fmt::Debug, hash::Hash, time::Instant};

use atm0s_sdn::features::{dht_kv, pubsub, FeaturesControl, FeaturesEvent};
use media_server_protocol::endpoint::{PeerId, TrackName};
use sans_io_runtime::{Task, TaskSwitcher};

use crate::transport::{LocalTrackId, RemoteTrackId};

use self::{channel_pub::RoomChannelPublisher, channel_sub::RoomChannelSubscribe, metadata::RoomMetadata};

use super::{ClusterEndpointControl, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash, Output};
use super::{ClusterEndpointControl, ClusterEndpointEvent, ClusterLocalTrackControl, ClusterRemoteTrackControl, ClusterRoomHash};

mod channel_pub;
mod channel_sub;
mod metadata;

#[derive(num_enum::TryFromPrimitive)]
#[repr(u8)]
pub enum FeedbackKind {
Bitrate = 0,
KeyFrameRequest = 1,
}

pub enum Input<Owner> {
Sdn(FeaturesEvent),
Endpoint(Owner, ClusterEndpointControl),
}

pub enum Output<Owner> {
Sdn(ClusterRoomHash, FeaturesControl),
Endpoint(Vec<Owner>, ClusterEndpointEvent),
Destroy,
}

#[derive(num_enum::TryFromPrimitive)]
#[repr(usize)]
enum TaskType {
Expand All @@ -55,6 +48,7 @@ pub struct ClusterRoom<Owner> {
subscriber: RoomChannelSubscribe<Owner>,
switcher: TaskSwitcher,
queue: VecDeque<Output<Owner>>,
destroyed: bool, //this flag for avoiding multi-time output destroy output
}

impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>> for ClusterRoom<Owner> {
Expand All @@ -73,8 +67,8 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>>
if let Some(out) = self.queue.pop_front() {
return Some(out);
}
loop {
match self.switcher.queue_current()?.try_into().ok()? {
while let Some(c) = self.switcher.queue_current() {
match c.try_into().ok()? {
TaskType::Metadata => {
if let Some(out) = self.switcher.queue_process(self.metadata.pop_output(now)) {
return Some(self.process_meta_output(out));
Expand All @@ -92,6 +86,14 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> Task<Input<Owner>, Output<Owner>>
}
}
}

if self.metadata.peers() == 0 && !self.destroyed {
log::info!("[ClusterRoom {}] leave last peer => remove room", self.room);
self.destroyed = true;
Some(Output::Destroy)
} else {
None
}
}

fn shutdown(&mut self, _now: Instant) -> Option<Output<Owner>> {
Expand All @@ -108,6 +110,7 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Owner> {
subscriber: RoomChannelSubscribe::new(room),
switcher: TaskSwitcher::new(3),
queue: VecDeque::new(),
destroyed: false,
}
}

Expand Down Expand Up @@ -145,8 +148,8 @@ impl<Owner: Debug + Copy + Clone + Hash + Eq> ClusterRoom<Owner> {
Some(self.process_meta_output(out))
}
ClusterEndpointControl::Leave => {
let out = self.metadata.on_leave(owner)?;
Some(self.process_meta_output(out))
let out = self.metadata.on_leave(owner);
Some(self.process_meta_output(out?))
}
ClusterEndpointControl::SubscribePeer(target) => {
let out = self.metadata.on_subscribe_peer(owner, target)?;
Expand Down Expand Up @@ -195,11 +198,11 @@ impl<Owner: Debug + Clone + Copy + Hash + Eq> ClusterRoom<Owner> {
}
}

fn control_local_track(&mut self, _now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option<Output<Owner>> {
fn control_local_track(&mut self, now: Instant, owner: Owner, track_id: LocalTrackId, control: ClusterLocalTrackControl) -> Option<Output<Owner>> {
let out = match control {
ClusterLocalTrackControl::Subscribe(target_peer, target_track) => self.subscriber.on_track_subscribe(owner, track_id, target_peer, target_track),
ClusterLocalTrackControl::RequestKeyFrame => self.subscriber.on_track_request_key(owner, track_id),
ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(owner, track_id, bitrate),
ClusterLocalTrackControl::DesiredBitrate(bitrate) => self.subscriber.on_track_desired_bitrate(now, owner, track_id, bitrate),
ClusterLocalTrackControl::Unsubscribe => self.subscriber.on_track_unsubscribe(owner, track_id),
}?;
Some(self.process_subscriber_output(out))
Expand Down Expand Up @@ -230,14 +233,6 @@ impl<Owner: Debug + Clone + Copy + Hash + Eq> ClusterRoom<Owner> {
}
}

pub fn gen_channel_id<T: From<u64>>(room: ClusterRoomHash, peer: &PeerId, track: &TrackName) -> T {
let mut h = std::hash::DefaultHasher::new();
room.as_ref().hash(&mut h);
peer.as_ref().hash(&mut h);
track.as_ref().hash(&mut h);
h.finish().into()
}

#[cfg(test)]
mod tests {
//TODO join room should set key-value and SUB to maps
Expand All @@ -247,7 +242,7 @@ mod tests {
//TODO track feedback should fire event to endpoint
//TODO track stopped should DEL key-value and pubsub STOP
//TODO subscribe track should SUB channel
//TODO fedback track should FEEDBACK channel
//TODO feddback track should FEEDBACK channel
//TODO channel data should fire event to endpoint
//TODO unsubscribe track should UNSUB channel
}
Loading

0 comments on commit 829ea8b

Please sign in to comment.