Skip to content

Commit

Permalink
feat: bitrate control with Twcc and Remb (#265)
Browse files Browse the repository at this point in the history
* feat: bitrate control with Twcc and Remb

* feat: added simple bitrate allocator based on track priority

* chore: fix warns
  • Loading branch information
giangndm authored Apr 26, 2024
1 parent 829ea8b commit 1736970
Show file tree
Hide file tree
Showing 16 changed files with 532 additions and 62 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
node: node.clone(),
media: MediaConfig { webrtc_addrs: webrtc_addrs.clone() },
};
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(100), cfg, None);
controller.add_worker::<_, _, MediaRuntimeWorker, PollingBackend<_, 128, 512>>(Duration::from_millis(1), cfg, None);
}

let mut req_id_seed = 0;
Expand Down
19 changes: 15 additions & 4 deletions packages/media_core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::{marker::PhantomData, time::Instant};

use media_server_protocol::{
endpoint::{PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName},
endpoint::{BitrateControlMode, PeerId, PeerMeta, RoomId, RoomInfoPublish, RoomInfoSubscribe, TrackMeta, TrackName, TrackPriority},
media::MediaPacket,
transport::RpcResult,
};
Expand Down Expand Up @@ -31,7 +31,7 @@ pub enum EndpointRemoteTrackReq {}
pub enum EndpointRemoteTrackRes {}

pub enum EndpointLocalTrackReq {
Switch(Option<(PeerId, TrackName)>),
Switch(Option<(PeerId, TrackName, TrackPriority)>),
}

pub enum EndpointLocalTrackRes {
Expand Down Expand Up @@ -68,6 +68,7 @@ pub enum EndpointRes {
/// This is used for controlling the local track, which is sent from endpoint
pub enum EndpointLocalTrackEvent {
Media(MediaPacket),
DesiredBitrate(u64),
}

/// This is used for controlling the remote track, which is sent from endpoint
Expand All @@ -83,6 +84,11 @@ pub enum EndpointEvent {
PeerTrackStopped(PeerId, TrackName),
RemoteMediaTrack(RemoteTrackId, EndpointRemoteTrackEvent),
LocalMediaTrack(LocalTrackId, EndpointLocalTrackEvent),
/// Egress est params
BweConfig {
current: u64,
desired: u64,
},
/// This session will be disconnect after some seconds
GoAway(u8, Option<String>),
}
Expand All @@ -109,6 +115,11 @@ enum TaskType {
Internal = 1,
}

pub struct EndpointCfg {
pub max_egress_bitrate: u32,
pub bitrate_control: BitrateControlMode,
}

pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
transport: T,
internal: EndpointInternal,
Expand All @@ -117,10 +128,10 @@ pub struct Endpoint<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> {
}

impl<T: Transport<ExtIn, ExtOut>, ExtIn, ExtOut> Endpoint<T, ExtIn, ExtOut> {
pub fn new(transport: T) -> Self {
pub fn new(cfg: EndpointCfg, transport: T) -> Self {
Self {
transport,
internal: EndpointInternal::new(),
internal: EndpointInternal::new(cfg),
switcher: TaskSwitcher::new(2),
_tmp: PhantomData::default(),
}
Expand Down
51 changes: 46 additions & 5 deletions packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ use crate::{
transport::{LocalTrackEvent, LocalTrackId, RemoteTrackEvent, RemoteTrackId, TransportEvent, TransportState, TransportStats},
};

use self::{local_track::EndpointLocalTrack, remote_track::EndpointRemoteTrack};
use self::{bitrate_allocator::BitrateAllocator, local_track::EndpointLocalTrack, remote_track::EndpointRemoteTrack};

use super::{middleware::EndpointMiddleware, EndpointEvent, EndpointReq, EndpointReqId, EndpointRes};
use super::{middleware::EndpointMiddleware, EndpointCfg, EndpointEvent, EndpointReq, EndpointReqId, EndpointRes};

mod bitrate_allocator;
mod local_track;
mod remote_track;

Expand All @@ -37,6 +38,7 @@ pub enum InternalOutput {
}

pub struct EndpointInternal {
cfg: EndpointCfg,
state: TransportState,
wait_join: Option<(RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe)>,
joined: Option<(ClusterRoomHash, RoomId, PeerId)>,
Expand All @@ -47,11 +49,13 @@ pub struct EndpointInternal {
_middlewares: Vec<Box<dyn EndpointMiddleware>>,
queue: VecDeque<InternalOutput>,
switcher: TaskSwitcher,
bitrate_allocator: BitrateAllocator,
}

impl EndpointInternal {
pub fn new() -> Self {
pub fn new(cfg: EndpointCfg) -> Self {
Self {
cfg,
state: TransportState::Connecting,
wait_join: None,
joined: None,
Expand All @@ -62,10 +66,25 @@ impl EndpointInternal {
_middlewares: Default::default(),
queue: Default::default(),
switcher: TaskSwitcher::new(2),
bitrate_allocator: BitrateAllocator::default(),
}
}

pub fn on_tick<'a>(&mut self, now: Instant) -> Option<InternalOutput> {
self.bitrate_allocator.on_tick();
if let Some(out) = self.bitrate_allocator.pop_output() {
match out {
bitrate_allocator::Output::SetTrackBitrate(track, bitrate) => {
if let Some(index) = self.local_tracks_id.get1(&track) {
let out = self.local_tracks.on_event(now, *index, local_track::Input::LimitBitrate(bitrate))?;
if let Some(out) = self.convert_local_track_output(now, track, out) {
return Some(out);
}
}
}
}
}

loop {
match self.switcher.looper_current(now)?.try_into().ok()? {
TaskType::LocalTracks => {
Expand Down Expand Up @@ -124,6 +143,12 @@ impl EndpointInternal {
TransportEvent::RemoteTrack(track, event) => self.on_transport_remote_track(now, track, event),
TransportEvent::LocalTrack(track, event) => self.on_transport_local_track(now, track, event),
TransportEvent::Stats(stats) => self.on_transport_stats(now, stats),
TransportEvent::EgressBitrateEstimate(bitrate) => {
let bitrate2 = bitrate.min(self.cfg.max_egress_bitrate as u64);
log::debug!("[EndpointInternal] limit egress bitrate {bitrate2}, rewrite from {bitrate}");
self.bitrate_allocator.set_egress_bitrate(bitrate2);
None
}
}
}

Expand Down Expand Up @@ -220,10 +245,10 @@ impl EndpointInternal {
}

fn on_transport_local_track<'a>(&mut self, now: Instant, track: LocalTrackId, event: LocalTrackEvent) -> Option<InternalOutput> {
if event.need_create() {
if let Some(kind) = event.need_create() {
log::info!("[EndpointInternal] create local track {:?}", track);
let room = self.joined.as_ref().map(|j| j.0.clone());
let index = self.local_tracks.add_task(EndpointLocalTrack::new(room));
let index = self.local_tracks.add_task(EndpointLocalTrack::new(kind, room));
self.local_tracks_id.insert(track, index);
}
let index = self.local_tracks_id.get1(&track)?;
Expand Down Expand Up @@ -333,6 +358,22 @@ impl EndpointInternal {
local_track::Output::Event(event) => Some(InternalOutput::Event(EndpointEvent::LocalMediaTrack(id, event))),
local_track::Output::Cluster(room, control) => Some(InternalOutput::Cluster(room, ClusterEndpointControl::LocalTrack(id, control))),
local_track::Output::RpcRes(req_id, res) => Some(InternalOutput::RpcRes(req_id, EndpointRes::LocalTrack(id, res))),
local_track::Output::DesiredBitrate(bitrate) => Some(InternalOutput::Event(EndpointEvent::BweConfig {
current: bitrate,
desired: bitrate + 100_000.max(bitrate * 1 / 5),
})),
local_track::Output::Started(kind, priority) => {
if kind.is_video() {
self.bitrate_allocator.set_video_track(id, priority);
}
None
}
local_track::Output::Stopped(kind) => {
if kind.is_video() {
self.bitrate_allocator.del_video_track(id);
}
None
}
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions packages/media_core/src/endpoint/internal/bitrate_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use derivative::Derivative;
use std::collections::VecDeque;

use media_server_protocol::endpoint::TrackPriority;

use crate::transport::LocalTrackId;

const DEFAULT_BITRATE_BPS: u64 = 800_000;

#[derive(Debug, PartialEq, Eq)]
pub enum Output {
SetTrackBitrate(LocalTrackId, u64),
}

#[derive(Derivative)]
#[derivative(Default)]
pub struct BitrateAllocator {
changed: bool,
#[derivative(Default(value = "DEFAULT_BITRATE_BPS"))]
egress_bitrate: u64,
tracks: smallmap::Map<LocalTrackId, TrackPriority>,
queue: VecDeque<Output>,
}

impl BitrateAllocator {
pub fn on_tick(&mut self) {
self.process();
}

pub fn set_egress_bitrate(&mut self, bitrate: u64) {
self.egress_bitrate = bitrate;
self.changed = true;
}

pub fn set_video_track(&mut self, track: LocalTrackId, priority: TrackPriority) {
self.tracks.insert(track, priority);
self.changed = true;
}

pub fn del_video_track(&mut self, track: LocalTrackId) {
self.tracks.remove(&track);
self.changed = true;
}

pub fn pop_output(&mut self) -> Option<Output> {
self.queue.pop_front()
}

fn process(&mut self) {
if !self.changed {
return;
}
self.changed = false;
let mut sum = TrackPriority(0);
for (_track, priority) in self.tracks.iter() {
sum = sum + *priority;
}

if *(sum.as_ref()) != 0 {
for (track, priority) in self.tracks.iter() {
self.queue.push_back(Output::SetTrackBitrate(*track, (self.egress_bitrate * priority.0 as u64) / sum.0 as u64));
}
}
}
}

#[cfg(test)]
mod test {
use super::{BitrateAllocator, Output, DEFAULT_BITRATE_BPS};

#[test]
fn single_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS)));
}

#[test]
fn multi_source() {
let mut allocator = BitrateAllocator::default();
allocator.set_video_track(0.into(), 1.into());
allocator.set_video_track(1.into(), 3.into());

allocator.on_tick();
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(0.into(), DEFAULT_BITRATE_BPS * 1 / 4)));
assert_eq!(allocator.pop_output(), Some(Output::SetTrackBitrate(1.into(), DEFAULT_BITRATE_BPS * 3 / 4)));
}
}
Loading

0 comments on commit 1736970

Please sign in to comment.