From 36cbb868d6b054c3924b720be4e2882638d4c7f6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann <dev@phil-opp.com> Date: Tue, 10 Dec 2024 16:43:49 +0100 Subject: [PATCH] Use `Receiver::try_recv` instead of `Stream::next` with timeout --- apis/rust/node/src/event_stream/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 4856cbb6..ca6502b8 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -36,7 +36,7 @@ mod thread; pub struct EventStream { node_id: NodeId, - receiver: flume::r#async::RecvStream<'static, EventItem>, + receiver: flume::Receiver<EventItem>, _thread_handle: EventStreamThreadHandle, close_channel: DaemonChannel, clock: Arc<uhlc::HLC>, @@ -148,7 +148,7 @@ impl EventStream { Ok(EventStream { node_id: node_id.clone(), - receiver: rx.into_stream(), + receiver: rx, _thread_handle: thread_handle, close_channel, clock, @@ -169,16 +169,15 @@ impl EventStream { pub async fn recv_async(&mut self) -> Option<Event> { loop { if self.scheduler.is_empty() { - if let Some(event) = self.receiver.next().await { + if let Ok(event) = self.receiver.recv_async().await { self.scheduler.add_event(event); } else { break; } } else { - match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await { - Either::Left((_elapsed, _)) => break, - Either::Right((Some(event), _)) => self.scheduler.add_event(event), - Either::Right((None, _)) => break, + match self.receiver.try_recv() { + Ok(event) => self.scheduler.add_event(event), + Err(_) => break, // no other ready events }; } } @@ -187,11 +186,11 @@ impl EventStream { } pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> { - let next_event = match select(Delay::new(dur), self.receiver.next()).await { + let next_event = match select(Delay::new(dur), self.receiver.recv_async()).await { Either::Left((_elapsed, _)) => { Some(EventItem::TimeoutError(eyre!("Receiver timed out"))) } - Either::Right((event, _)) => event, + Either::Right((event, _)) => event.ok(), }; next_event.map(Self::convert_event_item) } @@ -257,10 +256,11 @@ impl Stream for EventStream { type Item = Event; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { self.receiver + .stream() .poll_next_unpin(cx) .map(|item| item.map(Self::convert_event_item)) }