From 03ae0e7628efd68038ac76c9110e9e8aad99b7c0 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Mon, 28 Dec 2020 17:02:10 +0000 Subject: [PATCH] Input: Allow Restartable sources to be lazy This change is made with queue users in mind. Since sources of this kind *know* how to (re)create themselves, they can avoid being created at all until needed. This also adds machinery to preload tracks *before* they are needed, for gapless playback on queues and so on. Queues make use of the event system to do this. --- Cargo.toml | 2 +- .../serenity/voice_events_queue/src/main.rs | 12 +- examples/twilight/src/main.rs | 2 +- src/input/reader.rs | 9 + src/input/restartable.rs | 340 +++++++++++++----- src/input/ytdl_src.rs | 42 ++- src/tracks/command.rs | 3 + src/tracks/handle.rs | 10 + src/tracks/mod.rs | 11 + src/tracks/queue.rs | 36 +- 10 files changed, 367 insertions(+), 100 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 580151c0f..8817c9a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ version = "0.11" [dependencies.rand] optional = true -version = "0.7" +version = "0.8" [dependencies.serenity] optional = true diff --git a/examples/serenity/voice_events_queue/src/main.rs b/examples/serenity/voice_events_queue/src/main.rs index 0939fc7e0..28cc771b2 100644 --- a/examples/serenity/voice_events_queue/src/main.rs +++ b/examples/serenity/voice_events_queue/src/main.rs @@ -35,7 +35,10 @@ use serenity::{ }; use songbird::{ - input, + input::{ + self, + restartable::Restartable, + }, Event, EventContext, EventHandler as VoiceEventHandler, @@ -477,7 +480,10 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { if let Some(handler_lock) = manager.get(guild_id) { let mut handler = handler_lock.lock().await; - let source = match input::ytdl(&url).await { + + // Here, we use lazy restartable sources to make sure that we don't pay + // for decoding, playback on tracks which aren't actually live yet. + let source = match Restartable::ytdl(url, true).await { Ok(source) => source, Err(why) => { println!("Err starting source: {:?}", why); @@ -488,7 +494,7 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { }, }; - handler.enqueue_source(source); + handler.enqueue_source(source.into()); check_msg( msg.channel_id diff --git a/examples/twilight/src/main.rs b/examples/twilight/src/main.rs index 665868bcd..d7e196737 100644 --- a/examples/twilight/src/main.rs +++ b/examples/twilight/src/main.rs @@ -183,7 +183,7 @@ async fn play(msg: Message, state: State) -> Result<(), Box {}, } } + + #[allow(clippy::single_match)] + pub(crate) fn make_playable(&mut self) { + use Reader::*; + match self { + Restartable(r) => r.make_playable(), + _ => {}, + } + } } impl Read for Reader { diff --git a/src/input/restartable.rs b/src/input/restartable.rs index 26ce70673..5cb29108b 100644 --- a/src/input/restartable.rs +++ b/src/input/restartable.rs @@ -23,6 +23,36 @@ use std::{ type Recreator = Box; type RecreateChannel = Receiver, Recreator)>>; +// Use options here to make "take" more doable from a mut ref. +enum LazyProgress { + Dead(Box, Option, Codec, Container), + Live(Box, Option), + Working(Codec, Container, bool, RecreateChannel), +} + +impl Debug for LazyProgress { + fn fmt(&self, f: &mut Formatter<'_>) -> StdResult<(), FormatError> { + match self { + LazyProgress::Dead(meta, _, codec, container) => f + .debug_tuple("Dead") + .field(meta) + .field(&"") + .field(codec) + .field(container) + .finish(), + LazyProgress::Live(input, _) => + f.debug_tuple("Live").field(input).field(&"").finish(), + LazyProgress::Working(codec, container, stereo, chan) => f + .debug_tuple("Working") + .field(codec) + .field(container) + .field(stereo) + .field(chan) + .finish(), + } + } +} + /// A wrapper around a method to create a new [`Input`] which /// seeks backward by recreating the source. /// @@ -40,50 +70,84 @@ type RecreateChannel = Receiver, Recreator)>>; /// [`Input`]: Input /// [`Memory`]: cached::Memory /// [`Compressed`]: cached::Compressed +#[derive(Debug)] pub struct Restartable { async_handle: Option, - awaiting_source: Option, position: usize, - recreator: Option, - source: Box, + source: LazyProgress, } impl Restartable { /// Create a new source, which can be restarted using a `recreator` function. - pub async fn new(mut recreator: impl Restart + Send + 'static) -> Result { - recreator.call_restart(None).await.map(move |source| Self { - async_handle: None, - awaiting_source: None, - position: 0, - recreator: Some(Box::new(recreator)), - source: Box::new(source), - }) + /// + /// Lazy sources will not run their input recreator until the first byte + /// is needed, or are sent [`Track::make_playable`]/[`TrackHandle::make_playable`]. + /// + /// [`Track::make_playable`]: crate::tracks::Track::make_playable + /// [`TrackHandle::make_playable`]: crate::tracks::TrackHandle::make_playable + pub async fn new(mut recreator: impl Restart + Send + 'static, lazy: bool) -> Result { + if lazy { + recreator + .lazy_init() + .await + .map(move |(meta, kind, codec)| Self { + async_handle: None, + position: 0, + source: LazyProgress::Dead( + meta.unwrap_or_default().into(), + Some(Box::new(recreator)), + kind, + codec, + ), + }) + } else { + recreator.call_restart(None).await.map(move |source| Self { + async_handle: None, + position: 0, + source: LazyProgress::Live(source.into(), Some(Box::new(recreator))), + }) + } } /// Create a new restartable ffmpeg source for a local file. - pub async fn ffmpeg + Send + Clone + Sync + 'static>(path: P) -> Result { - Self::new(FfmpegRestarter { path }).await + pub async fn ffmpeg + Send + Clone + Sync + 'static>( + path: P, + lazy: bool, + ) -> Result { + Self::new(FfmpegRestarter { path }, lazy).await } /// Create a new restartable ytdl source. /// /// The cost of restarting and seeking will probably be *very* high: /// expect a pause if you seek backwards. - pub async fn ytdl + Send + Clone + Sync + 'static>(uri: P) -> Result { - Self::new(YtdlRestarter { uri }).await + pub async fn ytdl + Send + Clone + Sync + 'static>( + uri: P, + lazy: bool, + ) -> Result { + Self::new(YtdlRestarter { uri }, lazy).await } /// Create a new restartable ytdl source, using the first result of a youtube search. /// /// The cost of restarting and seeking will probably be *very* high: /// expect a pause if you seek backwards. - pub async fn ytdl_search(name: &str) -> Result { - Self::ytdl(format!("ytsearch1:{}", name)).await + pub async fn ytdl_search(name: &str, lazy: bool) -> Result { + Self::ytdl(format!("ytsearch1:{}", name), lazy).await } pub(crate) fn prep_with_handle(&mut self, handle: Handle) { self.async_handle = Some(handle); } + + pub(crate) fn make_playable(&mut self) { + if matches!(self.source, LazyProgress::Dead(_, _, _, _)) { + // This read triggers creation of a source, and is guaranteed not to modify any internals. + // It will harmlessly write out zeroes into the target buffer. + let mut bytes = [0u8; 0]; + let _ = Read::read(self, &mut bytes[..]); + } + } } /// Trait used to create an instance of a [`Reader`] at instantiation and when @@ -94,6 +158,13 @@ impl Restartable { pub trait Restart { /// Tries to create a replacement source. async fn call_restart(&mut self, time: Option) -> Result; + + /// Optionally retrieve metadata for a source which has been lazily initialised. + /// + /// This is particularly useful for sources intended to be queued, which + /// should occupy few resources when not live BUT have as much information as + /// possible made available at creation. + async fn lazy_init(&mut self) -> Result<(Option, Codec, Container)>; } struct FfmpegRestarter

@@ -137,6 +208,12 @@ where ffmpeg(self.path.as_ref()).await } } + + async fn lazy_init(&mut self) -> Result<(Option, Codec, Container)> { + is_stereo(self.path.as_ref()) + .await + .map(|(_stereo, metadata)| (Some(metadata), Codec::FloatPcm, Container::Raw)) + } } struct YtdlRestarter

@@ -160,26 +237,31 @@ where ytdl(self.uri.as_ref()).await } } -} -impl Debug for Restartable { - fn fmt(&self, f: &mut Formatter<'_>) -> StdResult<(), FormatError> { - f.debug_struct("Restartable") - .field("async_handle", &self.async_handle) - .field("awaiting_source", &self.awaiting_source) - .field("position", &self.position) - .field("recreator", &"") - .field("source", &self.source) - .finish() + async fn lazy_init(&mut self) -> Result<(Option, Codec, Container)> { + _ytdl_metadata(self.uri.as_ref()) + .await + .map(|m| (Some(m), Codec::FloatPcm, Container::Raw)) } } impl From for Input { fn from(mut src: Restartable) -> Self { - let kind = src.source.kind.clone(); - let meta = Some(src.source.metadata.take()); - let stereo = src.source.stereo; - let container = src.source.container; + let (meta, stereo, kind, container) = match &mut src.source { + LazyProgress::Dead(ref mut m, _rec, kind, container) => { + let stereo = m.channels == Some(2); + (Some(m.take()), stereo, kind.clone(), *container) + }, + LazyProgress::Live(ref mut input, _rec) => ( + Some(input.metadata.take()), + input.stereo, + input.kind.clone(), + input.container, + ), + // This branch should never be taken: this is an emergency measure. + LazyProgress::Working(kind, container, stereo, _) => + (None, *stereo, kind.clone(), *container), + }; Input::new(stereo, Reader::Restartable(src), kind, container, meta) } } @@ -190,43 +272,70 @@ impl From for Input { impl Read for Restartable { fn read(&mut self, buffer: &mut [u8]) -> IoResult { - let (out_val, march_pos, remove_async) = if let Some(chan) = &self.awaiting_source { - match chan.try_recv() { - Ok(Ok((new_source, recreator))) => { - self.source = new_source; - self.recreator = Some(recreator); - - (Read::read(&mut self.source, buffer), true, true) - }, - Ok(Err(source_error)) => { - let e = Err(IoError::new( - IoErrorKind::UnexpectedEof, - format!("Failed to create new reader: {:?}.", source_error), - )); - (e, false, true) - }, - Err(TryRecvError::Empty) => { - // Output all zeroes. - for el in buffer.iter_mut() { - *el = 0; - } - (Ok(buffer.len()), false, false) - }, - Err(_) => { - let e = Err(IoError::new( + use LazyProgress::*; + let (out_val, march_pos, next_source) = match &mut self.source { + Dead(meta, rec, kind, container) => { + let stereo = meta.channels == Some(2); + let handle = self.async_handle.clone(); + let new_chan = if let Some(rec) = rec.take() { + Some(regenerate_channel( + rec, + 0, + stereo, + kind.clone(), + *container, + handle, + )?) + } else { + return Err(IoError::new( IoErrorKind::UnexpectedEof, - "Failed to create new reader: dropped.", + "Illegal state: taken recreator was observed.".to_string(), )); - (e, false, true) - }, - } - } else { - // already have a good, valid source. - (Read::read(&mut self.source, buffer), true, false) + }; + + // Then, output all zeroes. + for el in buffer.iter_mut() { + *el = 0; + } + (Ok(buffer.len()), false, new_chan) + }, + Live(source, _) => (Read::read(source, buffer), true, None), + Working(_, _, _, chan) => { + match chan.try_recv() { + Ok(Ok((mut new_source, recreator))) => { + // Completed! + // Do read, then replace inner progress. + let bytes_read = Read::read(&mut new_source, buffer); + + (bytes_read, true, Some(Live(new_source, Some(recreator)))) + }, + Ok(Err(source_error)) => { + let e = Err(IoError::new( + IoErrorKind::UnexpectedEof, + format!("Failed to create new reader: {:?}.", source_error), + )); + (e, false, None) + }, + Err(TryRecvError::Empty) => { + // Output all zeroes. + for el in buffer.iter_mut() { + *el = 0; + } + (Ok(buffer.len()), false, None) + }, + Err(_) => { + let e = Err(IoError::new( + IoErrorKind::UnexpectedEof, + "Failed to create new reader: dropped.", + )); + (e, false, None) + }, + } + }, }; - if remove_async { - self.awaiting_source = None; + if let Some(src) = next_source { + self.source = src; } if march_pos { @@ -247,45 +356,62 @@ impl Seek for Restartable { use SeekFrom::*; match pos { Start(offset) => { - let stereo = self.source.stereo; - let _current_ts = utils::byte_count_to_timestamp(self.position, stereo); let offset = offset as usize; + let handle = self.async_handle.clone(); - if offset < self.position { - // We're going back in time. - if let Some(handle) = self.async_handle.as_ref() { - let (tx, rx) = flume::bounded(1); - - self.awaiting_source = Some(rx); - - let recreator = self.recreator.take(); - - if let Some(mut rec) = recreator { - handle.spawn(async move { - let ret_val = rec - .call_restart(Some(utils::byte_count_to_timestamp( - offset, stereo, - ))) - .await; - - let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec))); - }); + use LazyProgress::*; + match &mut self.source { + Dead(meta, rec, kind, container) => { + // regen at given start point + self.source = if let Some(rec) = rec.take() { + regenerate_channel( + rec, + offset, + meta.channels == Some(2), + kind.clone(), + *container, + handle, + )? } else { return Err(IoError::new( - IoErrorKind::Interrupted, - "Previous seek in progress.", + IoErrorKind::UnexpectedEof, + "Illegal state: taken recreator was observed.".to_string(), )); - } + }; self.position = offset; - } else { + }, + Live(input, rec) => + if offset < self.position { + // regen at given start point + // We're going back in time. + self.source = if let Some(rec) = rec.take() { + regenerate_channel( + rec, + offset, + input.stereo, + input.kind.clone(), + input.container, + handle, + )? + } else { + return Err(IoError::new( + IoErrorKind::UnexpectedEof, + "Illegal state: taken recreator was observed.".to_string(), + )); + }; + + self.position = offset; + } else { + // march on with live source. + self.position += input.consume(offset - self.position); + }, + Working(_, _, _, _) => { return Err(IoError::new( IoErrorKind::Interrupted, - "Cannot safely call seek until provided an async context handle.", + "Previous seek in progress.", )); - } - } else { - self.position += self.source.consume(offset - self.position); + }, } Ok(offset as u64) @@ -298,3 +424,31 @@ impl Seek for Restartable { } } } + +fn regenerate_channel( + mut rec: Recreator, + offset: usize, + stereo: bool, + kind: Codec, + container: Container, + handle: Option, +) -> IoResult { + if let Some(handle) = handle.as_ref() { + let (tx, rx) = flume::bounded(1); + + handle.spawn(async move { + let ret_val = rec + .call_restart(Some(utils::byte_count_to_timestamp(offset, stereo))) + .await; + + let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec))); + }); + + Ok(LazyProgress::Working(kind, container, stereo, rx)) + } else { + Err(IoError::new( + IoErrorKind::Interrupted, + "Cannot safely call seek until provided an async context handle.", + )) + } +} diff --git a/src/input/ytdl_src.rs b/src/input/ytdl_src.rs index 36619825c..90e836bb8 100644 --- a/src/input/ytdl_src.rs +++ b/src/input/ytdl_src.rs @@ -11,7 +11,7 @@ use std::{ io::{BufRead, BufReader, Read}, process::{Command, Stdio}, }; -use tokio::task; +use tokio::{process::Command as TokioCommand, task}; use tracing::trace; const YOUTUBE_DL_COMMAND: &str = if cfg!(feature = "youtube-dlc") { @@ -66,6 +66,7 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result { .stdout(Stdio::piped()) .spawn()?; + // This rigmarole is required due to the inner synchronous reading context. let stderr = youtube_dl.stderr.take(); let (returned_stderr, value) = task::spawn_blocking(move || { let mut s = stderr.unwrap(); @@ -113,6 +114,45 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result { )) } +pub(crate) async fn _ytdl_metadata(uri: &str) -> Result { + // Most of these flags are likely unused, but we want identical search + // and/or selection as the above functions. + let ytdl_args = [ + "-j", + "-f", + "webm[abr>0]/bestaudio/best", + "-R", + "infinite", + "--no-playlist", + "--ignore-config", + uri, + "-o", + "-", + ]; + + let youtube_dl_output = TokioCommand::new(YOUTUBE_DL_COMMAND) + .args(&ytdl_args) + .stdin(Stdio::null()) + .output() + .await?; + + let o_vec = youtube_dl_output.stderr; + + let end = (&o_vec) + .iter() + .position(|el| *el == 0xA) + .unwrap_or_else(|| o_vec.len()); + + let value = serde_json::from_slice(&o_vec[..end]).map_err(|err| Error::Json { + error: err, + parsed_text: std::str::from_utf8(&o_vec).unwrap_or_default().to_string(), + })?; + + let metadata = Metadata::from_ytdl_output(value); + + Ok(metadata) +} + /// Creates a streamed audio source from YouTube search results with `youtube-dl(c)`,`ffmpeg`, and `ytsearch`. /// Takes the first video listed from the YouTube search. /// diff --git a/src/tracks/command.rs b/src/tracks/command.rs index c9d00d26d..a80b28670 100644 --- a/src/tracks/command.rs +++ b/src/tracks/command.rs @@ -30,6 +30,8 @@ pub enum TrackCommand { Request(OneshotSender>), /// Change the loop count/strategy of this track. Loop(LoopState), + /// Prompts a track's input to become live and usable, if it is not already. + MakePlayable, } impl std::fmt::Debug for TrackCommand { @@ -48,6 +50,7 @@ impl std::fmt::Debug for TrackCommand { Do(_f) => "Do([function])".to_string(), Request(tx) => format!("Request({:?})", tx), Loop(loops) => format!("Loop({:?})", loops), + MakePlayable => "MakePlayable".to_string(), } ) } diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index aaf92c0b3..c7387146d 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -74,6 +74,16 @@ impl TrackHandle { self.send(TrackCommand::Volume(volume)) } + /// Ready a track for playing if it is lazily initialised. + /// + /// Currently, only [`Restartable`] sources support lazy setup. + /// This call is a no-op for all others. + /// + /// [`Restartable`]: crate::input::restartable::Restartable + pub fn make_playable(&self) -> TrackResult<()> { + self.send(TrackCommand::MakePlayable) + } + /// Denotes whether the underlying [`Input`] stream is compatible with arbitrary seeking. /// /// If this returns `false`, all calls to [`seek_time`] will fail, and the track is diff --git a/src/tracks/mod.rs b/src/tracks/mod.rs index e7eff3a62..f72127bca 100644 --- a/src/tracks/mod.rs +++ b/src/tracks/mod.rs @@ -307,6 +307,7 @@ impl Track { TrackStateChange::Loops(self.loops, true), )); }, + MakePlayable => self.make_playable(), } }, Err(TryRecvError::Closed) => { @@ -320,6 +321,16 @@ impl Track { } } + /// Ready a track for playing if it is lazily initialised. + /// + /// Currently, only [`Restartable`] sources support lazy setup. + /// This call is a no-op for all others. + /// + /// [`Restartable`]: crate::input::restartable::Restartable + pub fn make_playable(&mut self) { + self.source.reader.make_playable(); + } + /// Creates a read-only copy of the audio track's state. /// /// The primary use-case of this is sending information across diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index c4560b614..93d722398 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -6,7 +6,7 @@ use crate::{ }; use async_trait::async_trait; use parking_lot::Mutex; -use std::{collections::VecDeque, ops::Deref, sync::Arc}; +use std::{collections::VecDeque, ops::Deref, sync::Arc, time::Duration}; use tracing::{info, warn}; /// A simple queue for several audio sources, designed to @@ -145,6 +145,23 @@ impl EventHandler for QueueHandler { } } +struct SongPreloader { + remote_lock: Arc>, +} + +#[async_trait] +impl EventHandler for SongPreloader { + async fn act(&self, _ctx: &EventContext<'_>) -> Option { + let inner = self.remote_lock.lock(); + + if let Some(track) = inner.tracks.get(1) { + let _ = track.0.make_playable(); + } + + None + } +} + impl TrackQueue { /// Create a new, empty, track queue. pub fn new() -> Self { @@ -194,6 +211,23 @@ impl TrackQueue { track.position, ); + // Attempts to start loading the next track before this one ends. + // Idea is to provide as close to gapless playback as possible, + // while minimising memory use. + if let Some(time) = track.source.metadata.duration { + let preload_time = time.checked_sub(Duration::from_secs(5)).unwrap_or_default(); + let remote_lock = self.inner.clone(); + + track + .events + .as_mut() + .expect("Queue inspecting EventStore on new Track: did not exist.") + .add_event( + EventData::new(Event::Delayed(preload_time), SongPreloader { remote_lock }), + track.position, + ); + } + inner.tracks.push_back(Queued(track_handle)); }