Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to decode an async stream? #279

Open
ababo opened this issue Apr 26, 2024 · 7 comments
Open

How to decode an async stream? #279

ababo opened this issue Apr 26, 2024 · 7 comments

Comments

@ababo
Copy link

ababo commented Apr 26, 2024

Since Symphonia lacks async support I would like to find a proper way to proactively feed with incoming chunks rather than via Read::read().

Indeed when I try to use a ring buffer that acts like a reader it's drained eventually making FormatReader return std::io::Error. After the ring buffer is topped up the format reader can be recovered, but at cost of skipping some data (an ogg page in my case) which is unacceptable.

I need a way to feed the format reader proactively with data or at least make its state fully recoverable.

@jBernavaPrah
Copy link

Hi @ababo

Did you find a way?

@ababo
Copy link
Author

ababo commented May 7, 2024

@jBernavaPrah No, I used an async packet decoder from ogg crate.

@jBernavaPrah
Copy link

Hi,

I have found a way to do it.

I'm not sure is the best way to do it, but it's working. :)

You need to implement the MediaSource to read the data and then use tokio::task::blocking_spawn to read the data without blocking the main thread.

Happy to help if needed.

@ababo
Copy link
Author

ababo commented May 29, 2024

Hi,
The problem of this approach is that it requires an entire OS thread to be allocated per each connection. For high-load server this doesn't look like a good solution.

@aschey
Copy link
Contributor

aschey commented May 29, 2024

I'm not sure if this is exactly the same use case you're asking for, but I made a crate to handle streaming data asynchronously https://github.com/aschey/stream-download-rs. It's not specific to audio data, but it will take care of treating an asynchronous stream like a normal Read + Seek source.

@Enitoni
Copy link

Enitoni commented Jun 6, 2024

Here's how I did it in my project.

/// Bridges an async [Loadable] with a synchronous [MediaSource].
struct LoadableMediaSource {
    rt: Handle,
    loadable: BoxedLoadable,
}

impl MediaSource for LoadableMediaSource {
    fn is_seekable(&self) -> bool {
        self.rt.block_on(self.loadable.seekable())
    }

    fn byte_len(&self) -> Option<u64> {
        let result = self.rt.block_on(self.loadable.length());

        result.and_then(|l| match l {
            LoaderLength::Bytes(bytes) => Some(bytes as u64),
            LoaderLength::Time(_) => None,
        })
    }
}

impl Seek for LoadableMediaSource {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        let result = self.rt.block_on(self.loadable.seek(pos));

        result
            .map_err(|e| std::io::Error::other(format!("Seek failed: {:?}", e)))
            .map(|seek| seek as u64)
    }
}

impl Read for LoadableMediaSource {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let result = self.rt.block_on(self.loadable.read(buf));

        result
            .map_err(|e| std::io::Error::other(format!("Read failed: {:?}", e)))
            .map(|read| match read {
                ReadResult::More(bytes) => bytes,
                ReadResult::End(bytes) => bytes,
            })
    }
}

@nashley
Copy link

nashley commented Jan 10, 2025

Using block_on is not a good idea here. block_in_place would be slightly better in that it at least doesn't block the executor and any other tasks assigned to that thread, but it does still block the thread, so this isn't a good solution either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants