Skip to content

Commit

Permalink
Merge pull request #386 from dora-rs/timeout-receiver
Browse files Browse the repository at this point in the history
Adding a timeout method to not block indefinitely next event
  • Loading branch information
phil-opp authored Dec 8, 2023
2 parents 798734f + 8648ce9 commit 631dcff
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
Expand Down Expand Up @@ -57,12 +59,13 @@ impl Node {
/// case "image":
/// ```
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
self.__next__(py)
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
}

pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv());
let event = py.allow_threads(|| self.events.recv(None));
Ok(event)
}

Expand Down Expand Up @@ -156,9 +159,12 @@ enum Events {
}

impl Events {
fn recv(&mut self) -> Option<PyEvent> {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
match self {
Events::Dora(events) => events.recv().map(PyEvent::from),
Events::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),
None => events.recv().map(PyEvent::from),
},
Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
}
}
Expand Down
1 change: 1 addition & 0 deletions apis/rust/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ arrow = { workspace = true }
arrow-schema = { workspace = true }
futures = "0.3.28"
futures-concurrency = "7.3.0"
futures-timer = "3.0.2"
dora-arrow-convert = { workspace = true }
aligned-vec = "0.5.0"

Expand Down
26 changes: 24 additions & 2 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

pub use event::{Event, MappedInputData, RawData};
use futures::{Stream, StreamExt};
use futures::{
future::{select, Either},
Stream, StreamExt,
};
use futures_timer::Delay;

use self::{
event::SharedMemoryData,
Expand Down Expand Up @@ -107,10 +111,25 @@ impl EventStream {
futures::executor::block_on(self.recv_async())
}

/// wait for the next event on the events stream until timeout
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
futures::executor::block_on(self.recv_async_timeout(dur))
}

pub async fn recv_async(&mut self) -> Option<Event> {
self.receiver.next().await.map(Self::convert_event_item)
}

pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
let next_event = match select(Delay::new(dur), self.receiver.next()).await {
Either::Left((_elapsed, _)) => {
Some(EventItem::TimeoutError(eyre!("Receiver timed out")))
}
Either::Right((event, _)) => event,
};
next_event.map(Self::convert_event_item)
}

fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
Expand Down Expand Up @@ -161,6 +180,9 @@ impl EventStream {
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
EventItem::TimeoutError(err) => {
Event::Error(format!("Timeout event stream error: {err:?}"))
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum EventItem {
ack_channel: flume::Sender<()>,
},
FatalError(eyre::Report),
TimeoutError(eyre::Report),
}

pub struct EventStreamThreadHandle {
Expand Down

0 comments on commit 631dcff

Please sign in to comment.