Skip to content

Commit

Permalink
feat: stream first chunk slice to align with frame for mp3 (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Nov 8, 2023
2 parents 5b82703 + 15c17e5 commit ce69580
Show file tree
Hide file tree
Showing 31 changed files with 2,381 additions and 453 deletions.
254 changes: 89 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion defs/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const EMAIL_VERIFICATION_VALIDITY_SECS = 3600;
export const EXTERNAL_RELAY_NO_DATA_SHUTDOWN_SECS = 30;

/** delay of which if external doesn't produce first data chunk, it will be cancelled */
export const EXTERNAL_RELAY_NO_DATA_START_SHUTDOWN_SECS = 30;
export const EXTERNAL_RELAY_NO_DATA_START_SHUTDOWN_SECS = 90;

export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 60;

Expand All @@ -52,6 +52,9 @@ export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip";

export const HEADER_RELAY_SOURCE_DEPLOYMENT = "x-source-deployment";

/** Internal relay rejection code header */
export const INTERNAL_RELAY_REJECTION_CODE_HEADER = "x-openstream-rejection-code";

/** timeout to wait to obtain a lock on a media session items
* if not released in this timeout, probably the item is poisoned
* and the process is aborted with a panic (and restarted by the process manager) */
Expand Down Expand Up @@ -104,6 +107,9 @@ export const STREAM_CHANNEL_CAPACITY = 16;
/** stream chunk size */
export const STREAM_CHUNK_SIZE = 16000;

/** open connections that last more than this time in seconds will be terminated */
export const STREAM_CONNECTION_MAX_DURATION_SECS = 21600;

/** limit of concurrent stream connections from the same ip */
export const STREAM_IP_CONNECTIONS_LIMIT = 8;

Expand Down
23 changes: 23 additions & 0 deletions rs/packages/__mp3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "mp3"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-stream = "0.3.3"
bytes = "1.3.0"
constants = { version = "0.1.0", path = "../../config/constants" }
futures-util = "0.3.25"
hyper = "0.14.27"
log = "0.4.17"
minimp3 = { version = "0.5.1", features = ["tokio", "async_tokio"] }
parking_lot = "0.12.1"
pin-project = "1.0.12"
spsc = { version = "0.1.0", path = "../spsc" }
symphonia = { version = "0.5.2", default-features = false, features = ["mp3", "symphonia-bundle-mp3"] }
thiserror = "1.0.38"
tokio = { version = "1.29.0", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["io", "io-util"] }
#tokio-puremp3 = { path = "../../../tokio-puremp3" }
268 changes: 268 additions & 0 deletions rs/packages/__mp3/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use bytes::{Bytes, BytesMut};
use futures_util::{ready, Stream};
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;

use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

// #[cfg(not(no_minimp3))]
use minimp3::Frame;

use log::*;

#[pin_project::pin_project]
pub struct TryStreamAsyncRead<S> {
#[pin]
stream: S,
buffer: BytesMut,
}

impl<E: std::error::Error + Send + Sync + 'static, S: Stream<Item = Result<Bytes, E>>>
TryStreamAsyncRead<S>
{
pub fn new(stream: S) -> Self {
Self {
stream,
buffer: BytesMut::new(),
}
}

pub fn into_inner(self) -> (S, BytesMut) {
(self.stream, self.buffer)
}
}

impl<E: std::error::Error + Send + Sync + 'static, S: Stream<Item = Result<Bytes, E>>> AsyncRead
for TryStreamAsyncRead<S>
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut project = self.project();
loop {
if !buf.remaining() == 0 {
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
}

if !project.buffer.is_empty() {
let len = usize::min(project.buffer.len(), buf.remaining());
let bytes = project.buffer.split_to(len).freeze();
buf.put_slice(bytes.as_ref());
return Poll::Ready(Ok(()));
} else {
match ready!(project.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(Ok(())),
Some(Err(e)) => {
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
}
Some(Ok(bytes)) => {
project.buffer.extend_from_slice(bytes.as_ref());
continue;
}
}
}
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ReadRateError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("no data timer elapsed")]
NoData,
}

// #[cfg(not(no_minimp3))]
const NO_DATA_ERROR_DELAY: tokio::time::Duration = tokio::time::Duration::from_secs(20);

/// An adapter that lets you inspect the data that's being read.
///
/// This is useful for things like hashing data as it's read in.
#[pin_project::pin_project]
pub struct InspectBufferReader<R> {
#[pin]
reader: R,
buffer: BytesMut,
}

impl<R> InspectBufferReader<R> {
/// Create a new InspectReader, wrapping `reader` and calling `f` for the
/// new data supplied by each read call.
///
/// The closure will only be called with an empty slice if the inner reader
/// returns without reading data into the buffer. This happens at EOF, or if
/// `poll_read` is called with a zero-size buffer.
pub fn new(reader: R) -> InspectBufferReader<R>
where
R: AsyncRead,
{
InspectBufferReader {
reader,
buffer: BytesMut::new(),
}
}

/// Consumes the `InspectReader`, returning the wrapped reader
pub fn into_inner(self) -> (R, BytesMut) {
(self.reader, self.buffer)
}

pub fn take_buffer(&mut self) -> BytesMut {
self.buffer.split_to(self.buffer.len())
}
}

impl<R: AsyncRead> AsyncRead for InspectBufferReader<R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let me = self.project();
let filled_length = buf.filled().len();
ready!(me.reader.poll_read(cx, buf))?;
me.buffer.extend_from_slice(&buf.filled()[filled_length..]);
Poll::Ready(Ok(()))
}
}

// #[cfg(not(no_minimp3))]
pub fn readrate<R: AsyncRead + Send + Sync + Unpin + 'static>(
reader: R,
) -> impl Stream<Item = Result<Bytes, ReadRateError>> {
let reader = InspectBufferReader::new(reader);

async_stream::stream! {
let start = tokio::time::Instant::now();
let duration = tokio::time::Duration::ZERO;
let mut error_instant = None;

let mut decoder = minimp3::Decoder::new(reader);

loop {

let Frame { bitrate, sample_rate, channels, layer: _, data } = match decoder.next_frame_future().await {

Ok(frame) => frame,

Err(e) => {

warn!("decoder frame error: {e} => {e:?}");

use minimp3::Error::*;

match error_instant {
None => {
let _ = error_instant.insert(tokio::time::Instant::now());
}

Some(instant) => {
if instant.elapsed() > NO_DATA_ERROR_DELAY {
yield Err(ReadRateError::NoData);
break;
}
}
};

match e {
InsufficientData => continue,
SkippedData => continue,
Eof => break,
Io(e) => {
yield Err(e.into());
break;
},
}
}
};

let _ = error_instant.take();

// Vec<i16>

let bytes = decoder.reader_mut().take_buffer().freeze();

let samples = data.len() / channels;
let duration_secs = samples as f64 / sample_rate as f64;

let decoded_len = data.len() * 2;
let transfer_len = bytes.len();

let ms = (tokio::time::Instant::now() - start + duration).as_millis();

info!("frame: decoded-len={decoded_len}, transfer_len={transfer_len}, samples={samples}, sample_rate={sample_rate}, kbitrate={bitrate} duration={duration_secs}s ms_until={ms}");
//duration += tokio::time::Duration::from_secs_f64(duration_secs);

let duration = tokio::time::Duration::from_secs_f64(duration_secs);

tokio::time::sleep(duration).await;

yield Ok(bytes);
}
}
}

// #[cfg(no_minimp3)]
// pub fn readrate<R: AsyncRead + Send + Sync + Unpin + 'static>(
// reader: R,
// ) -> impl Stream<Item = Result<Bytes, ReadRateError>> {
// let mut reader = InspectBufferReader::new(reader);

// async_stream::stream! {
// // let start = tokio::time::Instant::now();
// // let mut duration = tokio::time::Duration::ZERO;
// // let mut error_instant = None;

// // let mut decoder = minimp3::Decoder::new(reader);
// // let mut decoder = tokio_puremp3::ReadRate(reader);

// loop {

// let _header = match tokio_puremp3::next_frame(&mut reader).await {
// Ok(header) => header,
// Err(e) => {
// match e {
// tokio_puremp3::Error::IoError(e) => match e.kind() {
// std::io::ErrorKind::UnexpectedEof => break,
// _ => {
// yield Err(e.into());
// break;
// }
// }

// _ => {
// yield Err(ReadRateError::NoData);
// break;
// }
// }
// }
// };

// // Vec<i16>

// let bytes = reader.take_buffer().freeze();

// // let samples = data.len() / channels;
// // let duration_secs = samples as f64 / sample_rate as f64;

// // let decoded_len = data.len() * 2;
// // let transfer_len = bytes.len();

// // let sample_rate = header.sample_rate;
// // let bitrate = header.bitrate;
// // let duration = header.

// // trace!("frame: transfer_len={transfer_len}, samples={samples}, sample_rate={sample_rate}, kbitrate={bitrate} duration={duration_secs}s");
// // duration += tokio::time::Duration::from_secs_f64(duration_secs);

// yield Ok(bytes);

// // tokio::time::sleep_until(start + duration).await;
// }
// }
// }
File renamed without changes.
32 changes: 13 additions & 19 deletions rs/packages/mp3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
[package]
name = "mp3"
version = "0.1.0"
edition = "2021"
version = "0.3.4"
# authors = ["Guillaume Gomez <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
description = "Metadata parser for MP3 files"
repository = "https://github.com/GuillaumeGomez/mp3-metadata"
license = "MIT"

[dependencies]
async-stream = "0.3.3"
bytes = "1.3.0"
constants = { version = "0.1.0", path = "../../config/constants" }
futures-util = "0.3.25"
hyper = "0.14.27"
log = "0.4.17"
minimp3 = { version = "0.5.1", features = ["tokio", "async_tokio"] }
parking_lot = "0.12.1"
pin-project = "1.0.12"
spsc = { version = "0.1.0", path = "../spsc" }
symphonia = { version = "0.5.2", default-features = false, features = ["mp3", "symphonia-bundle-mp3"] }
thiserror = "1.0.38"
tokio = { version = "1.29.0", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["io", "io-util"] }
#tokio-puremp3 = { path = "../../../tokio-puremp3" }
keywords = ["mp3", "metadata"]

[lib]
name = "mp3"

[dev-dependencies]
reqwest = { version = "0.11", features = ["blocking"] }
simplemad = "0.9"
21 changes: 21 additions & 0 deletions rs/packages/mp3/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2016 Guillaume Gomez

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
10 changes: 10 additions & 0 deletions rs/packages/mp3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# mp3-metadata [![Build Status](https://travis-ci.org/GuillaumeGomez/mp3-metadata.svg?branch=master)](https://travis-ci.org/GuillaumeGomez/mp3-metadata) [![Build status](https://ci.appveyor.com/api/projects/status/g21vliyvouvsg92n/branch/master?svg=true)](https://ci.appveyor.com/project/GuillaumeGomez/mp3-metadata/branch/master)


mp3 metadata parser in rust.

For an example, take a look into examples folder.

## Doc

You can have access to an online doc [here](https://docs.rs/mp3-metadata/).
Binary file added rs/packages/mp3/assets/double_id.mp3
Binary file not shown.
Binary file added rs/packages/mp3/assets/error.mp3
Binary file not shown.
Binary file added rs/packages/mp3/assets/id3v2.mp3
Binary file not shown.
Binary file added rs/packages/mp3/assets/invalid_time.mp3
Binary file not shown.
Binary file added rs/packages/mp3/assets/test.mp3
Binary file not shown.
Binary file added rs/packages/mp3/assets/trunc_test.mp3
Binary file not shown.
Loading

0 comments on commit ce69580

Please sign in to comment.