Skip to content

Commit

Permalink
Avoid spawning a disposal thread per driver (#151)
Browse files Browse the repository at this point in the history
Adds a new field to Config, disposer, an Option<Sender<DisposalMessage>> responsible for dropping the DisposalMessage on a separate thread.

If this is not set, and the Config is passed into manager::Songbird, a thread is spawned for this purpose (which previously was spawned per driver).
If this is not set, and the Config is passed directly into Driver or Call, a thread is spawned locally, which is the current behavior as there is no where to store the Sender.

This disposer is then used in Driver as previously, to run possibly blocking destructors (which should only block the disposal thread). I cannot see this disposal thread getting overloaded, but if it is the DisposalMessages will simply be queued in the flume channel until it can be dropped.

Co-authored-by: Kyle Simpson <[email protected]>
  • Loading branch information
GnomedDev and FelixMcFelix committed Nov 20, 2023
1 parent 03b0803 commit be3a4e9
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
38 changes: 37 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::driver::DecodeMode;
#[cfg(feature = "driver")]
use crate::{
driver::{retry::Retry, CryptoMode, MixMode},
driver::{retry::Retry, tasks::disposal::DisposalThread, CryptoMode, MixMode},
input::codecs::*,
};

Expand Down Expand Up @@ -143,6 +143,16 @@ pub struct Config {
///
/// [`PROBE`]: static@PROBE
pub format_registry: &'static Probe,
#[cfg(feature = "driver")]
/// The Sender for a channel that will run the destructor of possibly blocking values.
///
/// If not set, a thread will be spawned to perform this, but it is recommended to create
/// a long running thread instead of relying on a per-driver thread.
///
/// Note: When using [`Songbird`] this is overwritten automatically by its disposal thread.
///
/// [`Songbird`]: crate::Songbird
pub disposer: Option<DisposalThread>,

// Test only attributes
#[cfg(feature = "driver")]
Expand Down Expand Up @@ -181,6 +191,8 @@ impl Default for Config {
#[cfg(feature = "driver")]
format_registry: &PROBE,
#[cfg(feature = "driver")]
disposer: None,
#[cfg(feature = "driver")]
#[cfg(test)]
tick_style: TickStyle::Timed,
#[cfg(feature = "driver")]
Expand Down Expand Up @@ -264,6 +276,23 @@ impl Config {
self
}

/// Sets this `Config`'s channel for sending disposal messages.
#[must_use]
pub fn disposer(mut self, disposer: DisposalThread) -> Self {
self.disposer = Some(disposer);
self
}

/// Ensures a global disposer has been set, initializing one if not.
#[must_use]
pub(crate) fn initialise_disposer(self) -> Self {
if self.disposer.is_some() {
self
} else {
self.disposer(DisposalThread::run())
}
}

/// This is used to prevent changes which would invalidate the current session.
pub(crate) fn make_safe(&mut self, previous: &Config, connected: bool) {
if connected {
Expand All @@ -272,6 +301,13 @@ impl Config {
}
}

#[cfg(not(feature = "driver"))]
impl Config {
pub(crate) fn initialise_disposer(self) -> Self {
self
}
}

// Test only attributes
#[cfg(all(test, feature = "driver"))]
impl Config {
Expand Down
32 changes: 29 additions & 3 deletions src/driver/tasks/disposal.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
use super::message::*;
use flume::Receiver;
use tracing::instrument;
use flume::{Receiver, Sender};
use tracing::{instrument, trace};

#[derive(Debug, Clone)]
pub struct DisposalThread(Sender<DisposalMessage>);

impl Default for DisposalThread {
fn default() -> Self {
Self::run()
}
}

impl DisposalThread {
pub fn run() -> Self {
let (mix_tx, mix_rx) = flume::unbounded();
std::thread::spawn(move || {
trace!("Disposal thread started.");
runner(mix_rx);
trace!("Disposal thread finished.");
});

Self(mix_tx)
}

pub(super) fn dispose(&self, message: DisposalMessage) {
drop(self.0.send(message))
}
}

/// The mixer's disposal thread is also synchronous, due to tracks,
/// inputs, etc. being based on synchronous I/O.
///
/// The mixer uses this to offload heavy and expensive drop operations
/// to prevent deadline misses.
#[instrument(skip(mix_rx))]
pub(crate) fn runner(mix_rx: Receiver<DisposalMessage>) {
fn runner(mix_rx: Receiver<DisposalMessage>) {
while mix_rx.recv().is_ok() {}
}
18 changes: 7 additions & 11 deletions src/driver/tasks/mixer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use state::*;
pub use track::*;

use super::{
disposal,
disposal::DisposalThread,
error::{Error, Result},
message::*,
};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct Mixer {
pub conn_active: Option<MixerConnection>,
pub content_prep_sequence: u64,
pub deadline: Instant,
pub disposer: Sender<DisposalMessage>,
pub disposer: DisposalThread,
pub encoder: OpusEncoder,
pub interconnect: Interconnect,
pub mix_rx: Receiver<MixerMessage>,
Expand Down Expand Up @@ -126,14 +126,11 @@ impl Mixer {
let tracks = Vec::with_capacity(1.max(config.preallocated_tracks));
let track_handles = Vec::with_capacity(1.max(config.preallocated_tracks));

// Create an object disposal thread here.
let (disposer, disposal_rx) = flume::unbounded();
std::thread::spawn(move || disposal::runner(disposal_rx));

let thread_pool = BlockyTaskPool::new(async_handle);

let symph_layout = config.mix_mode.symph_layout();

let disposer = config.disposer.clone().unwrap_or_default();
let config = config.into();

let sample_buffer = SampleBuffer::<f32>::new(
Expand Down Expand Up @@ -538,12 +535,11 @@ impl Mixer {
if track.playing.is_done() {
let p_state = track.playing.clone();
let to_drop = self.tracks.swap_remove(i);
drop(
self.disposer
.send(DisposalMessage::Track(Box::new(to_drop))),
);
self.disposer
.dispose(DisposalMessage::Track(Box::new(to_drop)));

let to_drop = self.track_handles.swap_remove(i);
drop(self.disposer.send(DisposalMessage::Handle(to_drop)));
self.disposer.dispose(DisposalMessage::Handle(to_drop));

self.fire_event(EventMessage::ChangeState(
i,
Expand Down
10 changes: 5 additions & 5 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Songbird {
client_data: OnceCell<ClientData>,
calls: DashMap<GuildId, Arc<Mutex<Call>>>,
sharder: Sharder,
config: PRwLock<Option<Config>>,
config: PRwLock<Config>,
}

impl Songbird {
Expand All @@ -76,7 +76,7 @@ impl Songbird {
client_data: OnceCell::new(),
calls: DashMap::new(),
sharder: Sharder::Serenity(SerenitySharder::default()),
config: Some(config).into(),
config: config.initialise_disposer().into(),
})
}

Expand Down Expand Up @@ -114,7 +114,7 @@ impl Songbird {
}),
calls: DashMap::new(),
sharder: Sharder::TwilightCluster(cluster),
config: Some(config).into(),
config: config.initialise_disposer().into(),
}
}

Expand Down Expand Up @@ -176,7 +176,7 @@ impl Songbird {
guild_id,
shard_handle,
info.user_id,
self.config.read().clone().unwrap_or_default(),
self.config.read().clone(),
);

Arc::new(Mutex::new(call))
Expand All @@ -193,7 +193,7 @@ impl Songbird {
/// Requires the `"driver"` feature.
pub fn set_config(&self, new_config: Config) {
let mut config = self.config.write();
*config = Some(new_config);
*config = new_config;
}

#[cfg(feature = "driver")]
Expand Down

0 comments on commit be3a4e9

Please sign in to comment.