diff --git a/src/input/sources/hls.rs b/src/input/sources/hls.rs index 0fe5ad3b3..69dc42691 100644 --- a/src/input/sources/hls.rs +++ b/src/input/sources/hls.rs @@ -1,22 +1,14 @@ -use std::{ - io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom}, - pin::Pin, - task::{Context, Poll}, -}; - use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; -use pin_project::pin_project; use reqwest::{header::HeaderMap, Client}; use stream_lib::Event; use symphonia_core::io::MediaSource; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; use tokio_util::io::StreamReader; use crate::input::{ AsyncAdapterStream, - AsyncMediaSource, + AsyncReadOnlySource, AudioStream, AudioStreamError, Compose, @@ -51,7 +43,7 @@ impl HlsRequest { } } - fn create_stream(&mut self) -> Result { + fn create_stream(&mut self) -> Result { let request = self .client .get(&self.request) @@ -70,51 +62,7 @@ impl HlsRequest { )), }))); - Ok(HlsStream { stream }) - } -} - -#[pin_project] -struct HlsStream { - #[pin] - stream: Box, -} - -impl AsyncRead for HlsStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - AsyncRead::poll_read(self.project().stream, cx, buf) - } -} - -impl AsyncSeek for HlsStream { - fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> { - Err(IoErrorKind::Unsupported.into()) - } - - fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - unreachable!() - } -} - -#[async_trait] -impl AsyncMediaSource for HlsStream { - fn is_seekable(&self) -> bool { - false - } - - async fn byte_len(&self) -> Option { - None - } - - async fn try_resume( - &mut self, - _offset: u64, - ) -> Result, AudioStreamError> { - Err(AudioStreamError::Unsupported) + Ok(AsyncReadOnlySource { stream }) } } diff --git a/src/input/sources/mod.rs b/src/input/sources/mod.rs index a74dd1968..27c9627bc 100644 --- a/src/input/sources/mod.rs +++ b/src/input/sources/mod.rs @@ -4,3 +4,84 @@ mod http; mod ytdl; pub use self::{file::*, hls::*, http::*, ytdl::*}; + +use std::{ + io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom}, + pin::Pin, + task::{Context, Poll}, +}; + +use async_trait::async_trait; +use pin_project::pin_project; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; + +use crate::input::{AsyncMediaSource, AudioStreamError}; + +/// `AsyncReadOnlySource` wraps any source implementing [`tokio::io::AsyncRead`] in an unseekable +/// [`symphonia_core::io::MediaSource`], similar to [`symphonia_core::io::ReadOnlySource`] +#[pin_project] +pub struct AsyncReadOnlySource { + #[pin] + stream: Box, +} + +impl AsyncReadOnlySource { + /// Instantiates a new `AsyncReadOnlySource` by taking ownership and wrapping the provided + /// `Read`er. + pub fn new(inner: R) -> Self + where + R: AsyncRead + Send + Sync + Unpin + 'static, + { + AsyncReadOnlySource { + stream: Box::new(inner), + } + } + + /// Gets a reference to the underlying reader. + pub fn get_ref(&self) -> &Box { + &self.stream + } + + /// Unwraps this `AsyncReadOnlySource`, returning the underlying reader. + pub fn into_inner(self) -> Box { + self.stream.into() + } +} + +impl AsyncRead for AsyncReadOnlySource { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + AsyncRead::poll_read(self.project().stream, cx, buf) + } +} + +impl AsyncSeek for AsyncReadOnlySource { + fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> { + Err(IoErrorKind::Unsupported.into()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + unreachable!() + } +} + +#[async_trait] +impl AsyncMediaSource for AsyncReadOnlySource { + fn is_seekable(&self) -> bool { + false + } + + async fn byte_len(&self) -> Option { + None + } + + async fn try_resume( + &mut self, + _offset: u64, + ) -> Result, AudioStreamError> { + Err(AudioStreamError::Unsupported) + } +}