Skip to content

Commit

Permalink
Input & Driver: Fix zombie processes on Unix (#39)
Browse files Browse the repository at this point in the history
Linux/Unix requires that processes be waited, which is unfortunate as Windows lets us abandon them to the murderous whims of the OS. This PR adds Unix-specific behaviour to send a SIGINT before waiting on the process, and adds an additional thread per call for asset disposal on all platforms.

Closes #38.

---

* Close processes by SIGINT and wait on Unix

This seems to remedy the Linux-specific zombie processes. Addition of
nix as a dependency *should* be fine on Windows, since I believe it
compiles to an empty crate.

* Dispose of Tracks on auxiliary thread

This adds a mechanism for the mixer threads to perform potentially expensive deallocation/cleanup outside of the main loop, preventing deadline misses etc. This should make misbehaving `wait`s a bit more friendly.
  • Loading branch information
FelixMcFelix authored Jan 26, 2021
1 parent a0e905a commit fe2282c
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 4 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ version = "0.10"
[dependencies.futures]
version = "0.3"

[dependencies.nix]
version = "0.19"
optional = true

[dependencies.parking_lot]
optional = true
version = "0.11"
Expand Down Expand Up @@ -133,6 +137,7 @@ driver = [
"byteorder",
"discortp",
"flume",
"nix",
"parking_lot",
"rand",
"serenity-voice-model",
Expand Down
18 changes: 18 additions & 0 deletions src/driver/tasks/disposal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use super::message::*;
use flume::Receiver;
use tracing::instrument;

/// The mixer's disposal thread is also synchronous, due to tracks,
/// inputs, etc. being based on synchronous I/O.
///
/// The mixer uses this to offload heavy and expensive drop operations
/// to prevent deadline misses.
#[instrument(skip(mix_rx))]
pub(crate) fn runner(mix_rx: Receiver<DisposalMessage>) {
loop {
match mix_rx.recv() {
Err(_) | Ok(DisposalMessage::Poison) => break,
_ => {},
}
}
}
9 changes: 9 additions & 0 deletions src/driver/tasks/message/disposal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#![allow(missing_docs)]

use crate::tracks::Track;

pub enum DisposalMessage {
Track(Track),

Poison,
}
3 changes: 2 additions & 1 deletion src/driver/tasks/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![allow(missing_docs)]

mod core;
mod disposal;
mod events;
mod mixer;
mod udp_rx;
mod udp_tx;
mod ws;

pub use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*};
pub use self::{core::*, disposal::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*};

use flume::Sender;
use tracing::info;
Expand Down
13 changes: 11 additions & 2 deletions src/driver/tasks/mixer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{error::Result, message::*, Config};
use super::{disposal, error::Result, message::*, Config};
use crate::{
constants::*,
tracks::{PlayMode, Track},
Expand Down Expand Up @@ -28,6 +28,7 @@ pub struct Mixer {
pub config: Config,
pub conn_active: Option<MixerConnection>,
pub deadline: Instant,
pub disposer: Sender<DisposalMessage>,
pub encoder: OpusEncoder,
pub interconnect: Interconnect,
pub mix_rx: Receiver<MixerMessage>,
Expand Down Expand Up @@ -74,12 +75,17 @@ impl Mixer {

let tracks = Vec::with_capacity(1.max(config.preallocated_tracks));

// Create an object disposal thread here.
let (disposer, disposal_rx) = flume::unbounded();
std::thread::spawn(move || disposal::runner(disposal_rx));

Self {
async_handle,
bitrate,
config,
conn_active: None,
deadline: Instant::now(),
disposer,
encoder,
interconnect,
mix_rx,
Expand Down Expand Up @@ -322,12 +328,13 @@ impl Mixer {

if track.playing.is_done() {
let p_state = track.playing();
self.tracks.swap_remove(i);
let to_drop = self.tracks.swap_remove(i);
to_remove.push(i);
self.fire_event(EventMessage::ChangeState(
i,
TrackStateChange::Mode(p_state),
))?;
let _ = self.disposer.send(DisposalMessage::Track(to_drop));
} else {
i += 1;
}
Expand Down Expand Up @@ -580,4 +587,6 @@ pub(crate) fn runner(
let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config);

mixer.run();

let _ = mixer.disposer.send(DisposalMessage::Poison);
}
1 change: 1 addition & 0 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(missing_docs)]

pub(crate) mod disposal;
pub mod error;
mod events;
pub mod message;
Expand Down
19 changes: 18 additions & 1 deletion src/input/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ use std::{
};
use tracing::debug;

#[cfg(unix)]
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};

/// Handle for a child process which ensures that any subprocesses are properly closed
/// on drop.
#[derive(Debug)]
Expand All @@ -31,7 +37,18 @@ impl Read for ChildContainer {

impl Drop for ChildContainer {
fn drop(&mut self) {
if let Err(e) = self.0.kill() {
#[cfg(not(unix))]
let attempt = self.0.kill();

#[cfg(unix)]
let attempt = {
let pid = Pid::from_raw(self.0.id() as i32);
let _ = signal::kill(pid, Signal::SIGINT);

self.0.wait()
};

if let Err(e) = attempt {
debug!("Error awaiting child process: {:?}", e);
}
}
Expand Down

0 comments on commit fe2282c

Please sign in to comment.