diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 2901a7668..7181974d5 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use dora_message::{ - daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, + daemon_to_node::{DaemonCommunication, DaemonReply}, node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; @@ -12,10 +12,7 @@ use futures::{ }; use futures_timer::Delay; -use self::{ - event::SharedMemoryData, - thread::{EventItem, EventStreamThreadHandle}, -}; +use self::thread::{EventItem, EventStreamThreadHandle}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use eyre::{eyre, Context}; @@ -143,51 +140,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { - EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, - NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, - NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { - let data = match data { - None => Ok(None), - Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), - Some(DataMessage::SharedMemory { - shared_memory_id, - len, - drop_token: _, // handled in `event_stream_loop` - }) => unsafe { - MappedInputData::map(&shared_memory_id, len).map(|data| { - Some(RawData::SharedMemory(SharedMemoryData { - data, - _drop: ack_channel, - })) - }) - }, - }; - let data = data.and_then(|data| { - let raw_data = data.unwrap_or(RawData::Empty); - raw_data - .into_arrow_array(&metadata.type_info) - .map(arrow::array::make_array) - }); - match data { - Ok(data) => Event::Input { - id, - metadata, - data: data.into(), - }, - Err(err) => Event::Error(format!("{err:?}")), - } - } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } - }, - + EventItem::NodeEvent { event } => event, EventItem::FatalError(err) => { Event::Error(format!("fatal event stream error: {err:?}")) } diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index add58e3e4..a30ebd0fe 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -3,6 +3,7 @@ use dora_core::{ uhlc::{self, Timestamp}, }; use dora_message::{ + common::{DataMessage, DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonReply, NodeEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; @@ -15,6 +16,8 @@ use std::{ use crate::daemon_connection::DaemonChannel; +use super::{event::SharedMemoryData, Event, MappedInputData, RawData}; + pub fn init( node_id: NodeId, tx: flume::Sender, @@ -28,10 +31,7 @@ pub fn init( #[derive(Debug)] pub enum EventItem { - NodeEvent { - event: NodeEvent, - ack_channel: flume::Sender<()>, - }, + NodeEvent { event: super::Event }, FatalError(eyre::Report), TimeoutError(eyre::Report), } @@ -123,25 +123,60 @@ fn event_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - let drop_token = match &inner { - NodeEvent::Input { - data: Some(data), .. - } => data.drop_token(), + + let event = match inner { + NodeEvent::Stop => Event::Stop, + NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, + NodeEvent::InputClosed { id } => Event::InputClosed { id }, + NodeEvent::Input { id, metadata, data } => { + let data = match data { + None => Ok(None), + Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(DataMessage::SharedMemory { + shared_memory_id, + len, + drop_token, + }) => unsafe { + let (drop_tx, drop_rx) = flume::bounded(0); + let data = MappedInputData::map(&shared_memory_id, len).map(|data| { + Some(RawData::SharedMemory(SharedMemoryData { + data, + _drop: drop_tx, + })) + }); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }); + pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1)); + data + }, + }; + let data = data.and_then(|data| { + let raw_data = data.unwrap_or(RawData::Empty); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); + match data { + Ok(data) => Event::Input { + id, + metadata, + data: data.into(), + }, + Err(err) => Event::Error(format!("{err:?}")), + } + } NodeEvent::AllInputsClosed => { // close the event stream tx = None; // skip this internal event continue; } - _ => None, }; if let Some(tx) = tx.as_ref() { - let (drop_tx, drop_rx) = flume::bounded(0); - match tx.send(EventItem::NodeEvent { - event: inner, - ack_channel: drop_tx, - }) { + match tx.send(EventItem::NodeEvent { event }) { Ok(()) => {} Err(send_error) => { let event = send_error.into_inner(); @@ -152,12 +187,8 @@ fn event_stream_loop( break 'outer Ok(()); } } - - if let Some(token) = drop_token { - pending_drop_tokens.push((token, drop_rx, Instant::now(), 1)); - } } else { - tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); + tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`"); } } }; @@ -189,7 +220,7 @@ fn event_stream_loop( fn handle_pending_drop_tokens( pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, ) -> eyre::Result<()> { let mut still_pending = Vec::new(); for (token, rx, since, warn) in pending_drop_tokens.drain(..) { @@ -197,7 +228,10 @@ fn handle_pending_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::TryRecvError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::TryRecvError::Empty) => { let duration = Duration::from_secs(30 * warn); @@ -214,7 +248,7 @@ fn handle_pending_drop_tokens( fn report_remaining_drop_tokens( mut channel: DaemonChannel, - mut drop_tokens: Vec, + mut drop_tokens: Vec, mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, timestamp: Timestamp, ) -> eyre::Result<()> { @@ -227,7 +261,10 @@ fn report_remaining_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::RecvTimeoutError::Timeout) => { let duration = Duration::from_secs(30); @@ -252,7 +289,7 @@ fn report_remaining_drop_tokens( } fn report_drop_tokens( - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, channel: &mut DaemonChannel, timestamp: Timestamp, ) -> Result<(), eyre::ErrReport> { diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index d62b05887..d30bf40b1 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use dora_message::{ + common::{DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, - node_to_daemon::{DaemonRequest, DropToken, Timestamped}, + node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; pub struct DropStream { - receiver: flume::Receiver, + receiver: flume::Receiver, _thread_handle: DropStreamThreadHandle, } @@ -82,7 +83,7 @@ impl DropStream { } impl std::ops::Deref for DropStream { - type Target = flume::Receiver; + type Target = flume::Receiver; fn deref(&self) -> &Self::Target { &self.receiver @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream { #[tracing::instrument(skip(tx, channel, clock))] fn drop_stream_loop( node_id: NodeId, - tx: flume::Sender, + tx: flume::Sender, mut channel: DaemonChannel, clock: Arc, ) { @@ -125,16 +126,22 @@ fn drop_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - match inner { - NodeDropEvent::OutputDropped { drop_token } => { - if tx.send(drop_token).is_err() { - tracing::warn!( - "drop channel was closed already, could not forward \ - drop token`{drop_token:?}`" - ); - break 'outer; - } - } + let event = match inner { + NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }, + NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }, + }; + if tx.send(event).is_err() { + tracing::warn!( + "drop channel was closed already, could not forward \ + drop token event `{event:?}`" + ); + break 'outer; } } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 234cc8402..905a15adb 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -15,6 +15,7 @@ use dora_core::{ }; use dora_message::{ + common::DropTokenStatus, daemon_to_node::{DaemonReply, NodeConfig}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, @@ -47,6 +48,7 @@ pub struct DoraNode { clock: Arc, sent_out_shared_memory: HashMap, + shared_memory_in_use: HashMap, drop_stream: DropStream, cache: VecDeque, @@ -147,6 +149,7 @@ impl DoraNode { control_channel, clock, sent_out_shared_memory: HashMap::new(), + shared_memory_in_use: HashMap::new(), drop_stream, cache: VecDeque::new(), dataflow_descriptor, @@ -348,10 +351,7 @@ impl DoraNode { fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { loop { match self.drop_stream.try_recv() { - Ok(token) => match self.sent_out_shared_memory.remove(&token) { - Some(region) => self.add_to_cache(region), - None => tracing::warn!("received unknown finished drop token `{token:?}`"), - }, + Ok(event) => self.handle_drop_token_event(event), Err(flume::TryRecvError::Empty) => break, Err(flume::TryRecvError::Disconnected) => { bail!("event stream was closed before sending all expected drop tokens") @@ -361,6 +361,35 @@ impl DoraNode { Ok(()) } + fn handle_drop_token_event(&mut self, event: DropTokenStatus) { + let DropTokenStatus { token, state } = event; + match state { + dora_message::common::DropTokenState::Mapped => { + let region = self.sent_out_shared_memory.remove(&token); + match region { + Some(region) => { + self.shared_memory_in_use.insert(token, region); + } + None => { + tracing::warn!("received unknown mapped drop token `{token:?}`") + } + }; + } + dora_message::common::DropTokenState::Dropped => { + let region = self + .sent_out_shared_memory + .remove(&token) + .or_else(|| self.shared_memory_in_use.remove(&token)); + match region { + Some(region) => self.add_to_cache(region), + None => { + tracing::warn!("received unknown finished drop token `{token:?}`") + } + } + } + }; + } + fn add_to_cache(&mut self, memory: ShmemHandle) { const MAX_CACHE_SIZE: usize = 20; @@ -403,9 +432,7 @@ impl Drop for DoraNode { } match self.drop_stream.recv_timeout(Duration::from_secs(10)) { - Ok(token) => { - self.sent_out_shared_memory.remove(&token); - } + Ok(event) => self.handle_drop_token_event(event), Err(flume::RecvTimeoutError::Disconnected) => { tracing::warn!( "finished_drop_tokens channel closed while still waiting for drop tokens; \ diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 166d31642..da7f0252b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -8,7 +8,10 @@ use dora_core::{ uhlc::{self, HLC}, }; use dora_message::{ - common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus}, + common::{ + DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, + }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ @@ -887,7 +890,7 @@ impl Daemon { .send_out(dataflow_id, node_id, output_id, metadata, data) .await .context("failed to send out")?, - DaemonNodeEvent::ReportDrop { tokens } => { + DaemonNodeEvent::ReportTokenState { token_events } => { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!( "failed to get handle drop tokens: \ @@ -897,17 +900,27 @@ impl Daemon { match dataflow { Ok(dataflow) => { - for token in tokens { + for event in token_events { + let DropTokenStatus { token, state } = event; match dataflow.pending_drop_tokens.get_mut(&token) { - Some(info) => { - if info.pending_nodes.remove(&node_id) { - dataflow.check_drop_token(token, &self.clock).await?; - } else { - tracing::warn!( - "node `{node_id}` is not pending for drop token `{token:?}`" - ); + Some(info) => match state { + dora_message::common::DropTokenState::Mapped => { + let changed = info.pending_nodes.remove(&node_id); + info.mapped_in_nodes.insert(node_id.clone()); + if changed { + dataflow + .check_drop_token_mapped(token, &self.clock) + .await?; + } + } + dora_message::common::DropTokenState::Dropped => { + let mut changed = info.pending_nodes.remove(&node_id); + changed |= info.mapped_in_nodes.remove(&node_id); + if changed { + dataflow.check_drop_token(token, &self.clock).await?; + } } - } + }, None => tracing::warn!("unknown drop token `{token:?}`"), } } @@ -1344,6 +1357,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }) .pending_nodes .insert(receiver_id.clone()); @@ -1382,6 +1396,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }); // check if all local subscribers are finished with the token dataflow.check_drop_token(token, clock).await?; @@ -1642,7 +1657,7 @@ impl RunningDataflow { async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { match self.pending_drop_tokens.entry(token) { std::collections::hash_map::Entry::Occupied(entry) => { - if entry.get().pending_nodes.is_empty() { + if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() { let (drop_token, info) = entry.remove_entry(); let result = match self.drop_channels.get_mut(&info.owner) { Some(channel) => send_with_timestamp( @@ -1670,6 +1685,38 @@ impl RunningDataflow { Ok(()) } + + async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { + match self.pending_drop_tokens.entry(token) { + std::collections::hash_map::Entry::Occupied(entry) => { + if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() { + let info = entry.get(); + let result = match self.drop_channels.get_mut(&info.owner) { + Some(channel) => send_with_timestamp( + channel, + NodeDropEvent::OutputMapped { drop_token: token }, + clock, + ) + .wrap_err("send failed"), + None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)), + }; + if let Err(err) = result.wrap_err_with(|| { + format!( + "failed to report drop token mapped `{token:?}` to owner `{}`", + &info.owner + ) + }) { + tracing::warn!("{err:?}"); + } + } + } + std::collections::hash_map::Entry::Vacant(_) => { + tracing::warn!("check_drop_token_mapped called with already closed token") + } + } + + Ok(()) + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -1679,9 +1726,14 @@ type InputId = (NodeId, DataId); struct DropTokenInformation { /// The node that created the associated drop token. owner: NodeId, - /// Contains the set of pending nodes that still have access to the input - /// associated with a drop token. + /// Contains the set of nodes that have not mapped the input associated + /// with a drop token yet. The shared memory region needs to be kept + /// alive until this list is empty. pending_nodes: BTreeSet, + /// Contains the set of nodes that still have the input data associated + /// with a drop token mapped in their address space. The shared memory + /// region must not be overwritten until this list is empty. + mapped_in_nodes: BTreeSet, } #[derive(Debug)] @@ -1727,8 +1779,8 @@ pub enum DaemonNodeEvent { metadata: metadata::Metadata, data: Option, }, - ReportDrop { - tokens: Vec, + ReportTokenState { + token_events: Vec, }, EventStreamDropped { reply_sender: oneshot::Sender, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 76a3c6675..6726462e0 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -5,7 +5,7 @@ use dora_core::{ uhlc, }; use dora_message::{ - common::{DropToken, Timestamped}, + common::{DropTokenState, DropTokenStatus, Timestamped}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, node_to_daemon::DaemonRequest, DataflowId, @@ -335,7 +335,10 @@ impl Listener { Some(0) => { dropped += 1; if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { - drop_tokens.push(drop_token); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }); } *event.as_mut() = None; } @@ -509,13 +512,13 @@ impl Listener { Ok(()) } - async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { + async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { if !drop_tokens.is_empty() { let event = Event::Node { dataflow_id: self.dataflow_id, node_id: self.node_id.clone(), - event: DaemonNodeEvent::ReportDrop { - tokens: drop_tokens, + event: DaemonNodeEvent::ReportTokenState { + token_events: drop_tokens, }, }; let event = Timestamped { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 03a75e88f..7e868b6e9 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -172,6 +172,22 @@ impl fmt::Debug for DataMessage { } } +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropTokenStatus { + pub token: DropToken, + pub state: DropTokenState, +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum DropTokenState { + Mapped, + Dropped, +} + #[derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index 178b33779..c1dda819f 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -73,5 +73,6 @@ pub enum NodeEvent { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum NodeDropEvent { + OutputMapped { drop_token: DropToken }, OutputDropped { drop_token: DropToken }, } diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index 6967c1a3b..2bf4a939f 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -1,7 +1,10 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; -use crate::{current_crate_version, metadata::Metadata, versions_compatible, DataflowId}; +use crate::{ + common::DropTokenStatus, current_crate_version, metadata::Metadata, versions_compatible, + DataflowId, +}; use dora_core::config::{DataId, NodeId}; @@ -19,10 +22,10 @@ pub enum DaemonRequest { /// required drop tokens. OutputsDone, NextEvent { - drop_tokens: Vec, + drop_tokens: Vec, }, ReportDropTokens { - drop_tokens: Vec, + drop_tokens: Vec, }, SubscribeDrop, NextFinishedDropTokens,