Skip to content

Commit

Permalink
Use symphonia::io::MediaSource for Reader extensions (#61)
Browse files Browse the repository at this point in the history
This PR does the following:

 * Changes both `Reader::Extension` and `Reader::ExtensionSeek`  to use `symphonia::io::MediaSource`.
 * Removes the `File` and `Vec` variants of readers, instead opting to provide a `from_file` and `from_memory` associated function to create readers from the `File` and `Cursor<Vec<u8>>` implementations of `MediaSource`. 
 * Removes the ReadSeek trait.
 * Added a dependency on `symphonia_core`. This crate has no additional dependencies.
  • Loading branch information
james7132 authored and FelixMcFelix committed May 10, 2021
1 parent 63d53b2 commit a86898c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 70 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = { version = "0.1", features = ["log"] }
tracing-futures = "0.2"
symphonia-core = "0.2"

[dependencies.async-trait]
optional = true
Expand Down
6 changes: 3 additions & 3 deletions src/input/dca.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{codec::OpusDecoderState, error::DcaError, Codec, Container, Input, Metadata, Reader};
use serde::Deserialize;
use std::{ffi::OsStr, io::BufReader, mem};
use std::{ffi::OsStr, mem};
#[cfg(not(feature = "tokio-02-marker"))]
use tokio::{fs::File as TokioFile, io::AsyncReadExt};
#[cfg(feature = "tokio-02-marker")]
Expand Down Expand Up @@ -46,7 +46,7 @@ async fn _dca(path: &OsStr) -> Result<Input, DcaError> {
.await
.map_err(DcaError::IoError)?;

let reader = BufReader::new(json_reader.into_inner().into_std().await);
let reader = json_reader.into_inner().into_std().await;

let metadata: Metadata = serde_json::from_slice::<DcaMetadata>(raw_json.as_slice())
.map_err(DcaError::InvalidMetadata)?
Expand All @@ -56,7 +56,7 @@ async fn _dca(path: &OsStr) -> Result<Input, DcaError> {

Ok(Input::new(
stereo,
Reader::File(reader),
Reader::from_file(reader),
Codec::Opus(OpusDecoderState::new().map_err(DcaError::Opus)?),
Container::Dca {
first_frame: (size as usize) + mem::size_of::<i32>() + header.len(),
Expand Down
97 changes: 30 additions & 67 deletions src/input/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::{
result::Result as StdResult,
};
use streamcatcher::{Catcher, TxCatcher};
pub use symphonia_core::io::MediaSource;

/// Usable data/byte sources for an audio stream.
///
/// Users may define their own data sources using [`Extension`]
/// and [`ExtensionSeek`].
/// Users may define their own data sources using [`Extension`].
///
/// [`Extension`]: Reader::Extension
/// [`ExtensionSeek`]: Reader::ExtensionSeek
pub enum Reader {
/// Piped output of another program (i.e., [`ffmpeg`]).
///
Expand All @@ -44,40 +43,38 @@ pub enum Reader {
///
/// Supports seeking.
Restartable(Restartable),
/// A source contained in a local file.
///
/// Supports seeking.
File(BufReader<File>),
/// A source contained as an array in memory.
///
/// Supports seeking.
Vec(Cursor<Vec<u8>>),
/// A basic user-provided source.
///
/// Does not support seeking.
Extension(Box<dyn Read + Send>),
/// A user-provided source which also implements [`Seek`].
///
/// Supports seeking.
///
/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
ExtensionSeek(Box<dyn ReadSeek + Send>),
/// Seeking support depends on underlying `MediaSource` implementation.
Extension(Box<dyn MediaSource + Send>),
}

impl Reader {
/// Returns whether the given source implements [`Seek`].
///
/// This might be an expensive operation and might involve blocking IO. In such cases, it is
/// advised to cache the return value when possible.
///
/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
pub fn is_seekable(&self) -> bool {
use Reader::*;
match self {
Restartable(_) | Compressed(_) | Memory(_) => true,
Extension(_) => false,
ExtensionSeek(_) => true,
Extension(source) => source.is_seekable(),
_ => false,
}
}

/// A source contained in a local file.
pub fn from_file(file: File) -> Self {
Self::Extension(Box::new(file))
}

/// A source contained as an array in memory.
pub fn from_memory(buf: Vec<u8>) -> Self {
Self::Extension(Box::new(Cursor::new(buf)))
}

#[allow(clippy::single_match)]
pub(crate) fn prep_with_handle(&mut self, handle: Handle) {
use Reader::*;
Expand Down Expand Up @@ -105,10 +102,7 @@ impl Read for Reader {
Memory(a) => Read::read(a, buffer),
Compressed(a) => Read::read(a, buffer),
Restartable(a) => Read::read(a, buffer),
File(a) => Read::read(a, buffer),
Vec(a) => Read::read(a, buffer),
Extension(a) => a.read(buffer),
ExtensionSeek(a) => a.read(buffer),
}
}
}
Expand All @@ -117,16 +111,22 @@ impl Seek for Reader {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
use Reader::*;
match self {
Pipe(_) | Extension(_) => Err(IoError::new(
Pipe(_) => Err(IoError::new(
IoErrorKind::InvalidInput,
"Seeking not supported on Reader of this type.",
)),
Memory(a) => Seek::seek(a, pos),
Compressed(a) => Seek::seek(a, pos),
File(a) => Seek::seek(a, pos),
Restartable(a) => Seek::seek(a, pos),
Vec(a) => Seek::seek(a, pos),
ExtensionSeek(a) => a.seek(pos),
Extension(a) =>
if a.is_seekable() {
a.seek(pos)
} else {
Err(IoError::new(
IoErrorKind::InvalidInput,
"Seeking not supported on Reader of this type.",
))
},
}
}
}
Expand All @@ -139,51 +139,14 @@ impl Debug for Reader {
Memory(a) => format!("{:?}", a),
Compressed(a) => format!("{:?}", a),
Restartable(a) => format!("{:?}", a),
File(a) => format!("{:?}", a),
Vec(a) => format!("{:?}", a),
Extension(_) => "Extension".to_string(),
ExtensionSeek(_) => "ExtensionSeek".to_string(),
};
f.debug_tuple("Reader").field(&field).finish()
}
}

impl From<Vec<u8>> for Reader {
fn from(val: Vec<u8>) -> Reader {
Reader::Vec(Cursor::new(val))
}
}

/// Fusion trait for custom input sources which allow seeking.
pub trait ReadSeek {
/// See [`Read::read`].
///
/// [`Read::read`]: https://doc.rust-lang.org/nightly/std/io/trait.Read.html#tymethod.read
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize>;
/// See [`Seek::seek`].
///
/// [`Seek::seek`]: https://doc.rust-lang.org/nightly/std/io/trait.Seek.html#tymethod.seek
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64>;
}

impl Read for dyn ReadSeek {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
ReadSeek::read(self, buf)
}
}

impl Seek for dyn ReadSeek {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
ReadSeek::seek(self, pos)
}
}

impl<R: Read + Seek> ReadSeek for R {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
Read::read(self, buf)
}

fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
Seek::seek(self, pos)
fn from(val: Vec<u8>) -> Self {
Self::from_memory(val)
}
}

0 comments on commit a86898c

Please sign in to comment.