Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Input: Fix high CPU use when initialising long files over HTTP #163

Merged
merged 2 commits into from
Mar 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 63 additions & 18 deletions src/input/adapters/async_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct AsyncAdapterSink {

impl AsyncAdapterSink {
async fn launch(mut self) {
let mut inner_buf = [0u8; 10 * 1024];
let mut inner_buf = [0u8; 32 * 1024];
let mut read_region = 0..0;
let mut hit_end = false;
let mut blocked = false;
Expand Down Expand Up @@ -73,6 +73,7 @@ impl AsyncAdapterSink {
.write(&inner_buf[read_region.start..read_region.end])
{
read_region.start += n_moved;
drop(self.resp_tx.send_async(AdapterResponse::ReadOccurred).await);
} else {
blocked = true;
}
Expand Down Expand Up @@ -137,9 +138,10 @@ impl AsyncAdapterSink {
pub struct AsyncAdapterStream {
bytes_out: HeapConsumer<u8>,
can_seek: bool,
// Note: this is Atomic just to work around the need for
// Note: these are Atomic just to work around the need for
// check_messages to take &self rather than &mut.
finalised: AtomicBool,
bytes_known_present: AtomicBool,
req_tx: Sender<AdapterRequest>,
resp_rx: Receiver<AdapterResponse>,
notify_tx: Arc<Notify>,
Expand Down Expand Up @@ -168,6 +170,7 @@ impl AsyncAdapterStream {
bytes_out,
can_seek,
finalised: false.into(),
bytes_known_present: false.into(),
req_tx,
resp_rx,
notify_tx,
Expand All @@ -180,16 +183,29 @@ impl AsyncAdapterStream {
stream
}

fn handle_messages(&self, block: bool) -> Option<AdapterResponse> {
fn handle_messages(&self, op: Operation) -> Option<AdapterResponse> {
loop {
match self.resp_rx.try_recv() {
Ok(AdapterResponse::ReadZero) => {
let msg = if op.will_block() {
self.resp_rx.recv().ok()
} else {
self.resp_rx.try_recv().ok()
};

let msg = if let Some(msg) = msg { msg } else { break None };

// state changes
match &msg {
AdapterResponse::ReadZero => {
self.finalised.store(true, Ordering::Relaxed);
},
Ok(a) => break Some(a),
Err(TryRecvError::Empty) if !block => break None,
Err(TryRecvError::Disconnected) => break None,
Err(TryRecvError::Empty) => {},
AdapterResponse::ReadOccurred => {
self.bytes_known_present.store(true, Ordering::Relaxed);
},
_ => {},
}

if op.expected_msg(&msg) {
break Some(msg);
}
}
}
Expand All @@ -212,11 +228,10 @@ impl AsyncAdapterStream {

impl Read for AsyncAdapterStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
// TODO: make this run via condvar instead?
// This needs to remain blocking or spin loopy
// Mainly because this is at odds with "keep CPU low."
loop {
drop(self.handle_messages(false));
let block = !(self.bytes_known_present.load(Ordering::Relaxed)
|| self.finalised.load(Ordering::Relaxed));
drop(self.handle_messages(Operation::Read { block }));

match self.bytes_out.read(buf) {
Ok(n) => {
Expand All @@ -229,9 +244,8 @@ impl Read for AsyncAdapterStream {
if self.finalised.load(Ordering::Relaxed) {
return Ok(0);
}

self.bytes_known_present.store(false, Ordering::Relaxed);
self.check_dropped()?;
std::thread::yield_now();
},
a => {
println!("Misc err {a:?}");
Expand All @@ -258,7 +272,7 @@ impl Seek for AsyncAdapterStream {
// wait for async to tell us that it has stopped writing,
// then clear buf and allow async to write again.
self.finalised.store(false, Ordering::Relaxed);
match self.handle_messages(true) {
match self.handle_messages(Operation::Seek) {
Some(AdapterResponse::SeekClear) => {},
None => self.check_dropped().map(|_| unreachable!())?,
_ => unreachable!(),
Expand All @@ -268,7 +282,7 @@ impl Seek for AsyncAdapterStream {

let _ = self.req_tx.send(AdapterRequest::SeekCleared);

match self.handle_messages(true) {
match self.handle_messages(Operation::Seek) {
Some(AdapterResponse::SeekResult(a)) => a,
None => self.check_dropped().map(|_| unreachable!()),
_ => unreachable!(),
Expand All @@ -286,7 +300,7 @@ impl MediaSource for AsyncAdapterStream {

let _ = self.req_tx.send(AdapterRequest::ByteLen);

match self.handle_messages(true) {
match self.handle_messages(Operation::Len) {
Some(AdapterResponse::ByteLen(a)) => a,
None => self.check_dropped().ok().map(|_| unreachable!()),
_ => unreachable!(),
Expand All @@ -306,6 +320,37 @@ enum AdapterResponse {
SeekClear,
ByteLen(Option<u64>),
ReadZero,
ReadOccurred,
}

#[derive(Copy, Clone)]
enum Operation {
Read { block: bool },
Seek,
Len,
}

impl Operation {
fn will_block(self) -> bool {
match self {
Self::Read { block } => block,
_ => true,
}
}

fn expected_msg(self, msg: &AdapterResponse) -> bool {
match self {
Self::Read { .. } => matches!(
msg,
AdapterResponse::ReadOccurred | AdapterResponse::ReadZero
),
Self::Seek => matches!(
msg,
AdapterResponse::SeekResult(_) | AdapterResponse::SeekClear
),
Self::Len => matches!(msg, AdapterResponse::ByteLen(_)),
}
}
}

/// An async port of symphonia's [`MediaSource`].
Expand Down