diff --git a/src/input/adapters/async_adapter.rs b/src/input/adapters/async_adapter.rs index fe763acc1..6131f12f2 100644 --- a/src/input/adapters/async_adapter.rs +++ b/src/input/adapters/async_adapter.rs @@ -34,7 +34,7 @@ struct AsyncAdapterSink { impl AsyncAdapterSink { async fn launch(mut self) { - let mut inner_buf = [0u8; 10 * 1024]; + let mut inner_buf = [0u8; 32 * 1024]; let mut read_region = 0..0; let mut hit_end = false; let mut blocked = false; @@ -73,6 +73,7 @@ impl AsyncAdapterSink { .write(&inner_buf[read_region.start..read_region.end]) { read_region.start += n_moved; + drop(self.resp_tx.send_async(AdapterResponse::ReadOccurred).await); } else { blocked = true; } @@ -137,9 +138,10 @@ impl AsyncAdapterSink { pub struct AsyncAdapterStream { bytes_out: HeapConsumer, can_seek: bool, - // Note: this is Atomic just to work around the need for + // Note: these are Atomic just to work around the need for // check_messages to take &self rather than &mut. finalised: AtomicBool, + bytes_known_present: AtomicBool, req_tx: Sender, resp_rx: Receiver, notify_tx: Arc, @@ -168,6 +170,7 @@ impl AsyncAdapterStream { bytes_out, can_seek, finalised: false.into(), + bytes_known_present: false.into(), req_tx, resp_rx, notify_tx, @@ -180,16 +183,29 @@ impl AsyncAdapterStream { stream } - fn handle_messages(&self, block: bool) -> Option { + fn handle_messages(&self, op: Operation) -> Option { loop { - match self.resp_rx.try_recv() { - Ok(AdapterResponse::ReadZero) => { + let msg = if op.will_block() { + self.resp_rx.recv().ok() + } else { + self.resp_rx.try_recv().ok() + }; + + let msg = if let Some(msg) = msg { msg } else { break None }; + + // state changes + match &msg { + AdapterResponse::ReadZero => { self.finalised.store(true, Ordering::Relaxed); }, - Ok(a) => break Some(a), - Err(TryRecvError::Empty) if !block => break None, - Err(TryRecvError::Disconnected) => break None, - Err(TryRecvError::Empty) => {}, + AdapterResponse::ReadOccurred => { + self.bytes_known_present.store(true, Ordering::Relaxed); + }, + _ => {}, + } + + if op.expected_msg(&msg) { + break Some(msg); } } } @@ -212,11 +228,10 @@ impl AsyncAdapterStream { impl Read for AsyncAdapterStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { - // TODO: make this run via condvar instead? - // This needs to remain blocking or spin loopy - // Mainly because this is at odds with "keep CPU low." loop { - drop(self.handle_messages(false)); + let block = !(self.bytes_known_present.load(Ordering::Relaxed) + || self.finalised.load(Ordering::Relaxed)); + drop(self.handle_messages(Operation::Read { block })); match self.bytes_out.read(buf) { Ok(n) => { @@ -229,9 +244,8 @@ impl Read for AsyncAdapterStream { if self.finalised.load(Ordering::Relaxed) { return Ok(0); } - + self.bytes_known_present.store(false, Ordering::Relaxed); self.check_dropped()?; - std::thread::yield_now(); }, a => { println!("Misc err {a:?}"); @@ -258,7 +272,7 @@ impl Seek for AsyncAdapterStream { // wait for async to tell us that it has stopped writing, // then clear buf and allow async to write again. self.finalised.store(false, Ordering::Relaxed); - match self.handle_messages(true) { + match self.handle_messages(Operation::Seek) { Some(AdapterResponse::SeekClear) => {}, None => self.check_dropped().map(|_| unreachable!())?, _ => unreachable!(), @@ -268,7 +282,7 @@ impl Seek for AsyncAdapterStream { let _ = self.req_tx.send(AdapterRequest::SeekCleared); - match self.handle_messages(true) { + match self.handle_messages(Operation::Seek) { Some(AdapterResponse::SeekResult(a)) => a, None => self.check_dropped().map(|_| unreachable!()), _ => unreachable!(), @@ -286,7 +300,7 @@ impl MediaSource for AsyncAdapterStream { let _ = self.req_tx.send(AdapterRequest::ByteLen); - match self.handle_messages(true) { + match self.handle_messages(Operation::Len) { Some(AdapterResponse::ByteLen(a)) => a, None => self.check_dropped().ok().map(|_| unreachable!()), _ => unreachable!(), @@ -306,6 +320,37 @@ enum AdapterResponse { SeekClear, ByteLen(Option), ReadZero, + ReadOccurred, +} + +#[derive(Copy, Clone)] +enum Operation { + Read { block: bool }, + Seek, + Len, +} + +impl Operation { + fn will_block(self) -> bool { + match self { + Self::Read { block } => block, + _ => true, + } + } + + fn expected_msg(self, msg: &AdapterResponse) -> bool { + match self { + Self::Read { .. } => matches!( + msg, + AdapterResponse::ReadOccurred | AdapterResponse::ReadZero + ), + Self::Seek => matches!( + msg, + AdapterResponse::SeekResult(_) | AdapterResponse::SeekClear + ), + Self::Len => matches!(msg, AdapterResponse::ByteLen(_)), + } + } } /// An async port of symphonia's [`MediaSource`].