From e16c59f83bfb3cc4ec9508234b1b8b9183ed1196 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 4 Oct 2024 13:06:38 +0200 Subject: [PATCH] Report when shared memory region is mapped to allow faster cleanup The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces. By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved. This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`. --- apis/rust/node/src/event_stream/mod.rs | 53 +----------- apis/rust/node/src/event_stream/thread.rs | 85 +++++++++++++------ apis/rust/node/src/node/drop_stream.rs | 35 +++++--- apis/rust/node/src/node/mod.rs | 41 +++++++-- binaries/daemon/src/lib.rs | 84 ++++++++++++++---- binaries/daemon/src/node_communication/mod.rs | 13 +-- libraries/message/src/common.rs | 16 ++++ libraries/message/src/daemon_to_node.rs | 1 + libraries/message/src/node_to_daemon.rs | 9 +- 9 files changed, 218 insertions(+), 119 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 2901a7668..7181974d5 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use dora_message::{ - daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, + daemon_to_node::{DaemonCommunication, DaemonReply}, node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; @@ -12,10 +12,7 @@ use futures::{ }; use futures_timer::Delay; -use self::{ - event::SharedMemoryData, - thread::{EventItem, EventStreamThreadHandle}, -}; +use self::thread::{EventItem, EventStreamThreadHandle}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use eyre::{eyre, Context}; @@ -143,51 +140,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { - EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, - NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, - NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { - let data = match data { - None => Ok(None), - Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), - Some(DataMessage::SharedMemory { - shared_memory_id, - len, - drop_token: _, // handled in `event_stream_loop` - }) => unsafe { - MappedInputData::map(&shared_memory_id, len).map(|data| { - Some(RawData::SharedMemory(SharedMemoryData { - data, - _drop: ack_channel, - })) - }) - }, - }; - let data = data.and_then(|data| { - let raw_data = data.unwrap_or(RawData::Empty); - raw_data - .into_arrow_array(&metadata.type_info) - .map(arrow::array::make_array) - }); - match data { - Ok(data) => Event::Input { - id, - metadata, - data: data.into(), - }, - Err(err) => Event::Error(format!("{err:?}")), - } - } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } - }, - + EventItem::NodeEvent { event } => event, EventItem::FatalError(err) => { Event::Error(format!("fatal event stream error: {err:?}")) } diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index add58e3e4..a30ebd0fe 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -3,6 +3,7 @@ use dora_core::{ uhlc::{self, Timestamp}, }; use dora_message::{ + common::{DataMessage, DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonReply, NodeEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; @@ -15,6 +16,8 @@ use std::{ use crate::daemon_connection::DaemonChannel; +use super::{event::SharedMemoryData, Event, MappedInputData, RawData}; + pub fn init( node_id: NodeId, tx: flume::Sender, @@ -28,10 +31,7 @@ pub fn init( #[derive(Debug)] pub enum EventItem { - NodeEvent { - event: NodeEvent, - ack_channel: flume::Sender<()>, - }, + NodeEvent { event: super::Event }, FatalError(eyre::Report), TimeoutError(eyre::Report), } @@ -123,25 +123,60 @@ fn event_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - let drop_token = match &inner { - NodeEvent::Input { - data: Some(data), .. - } => data.drop_token(), + + let event = match inner { + NodeEvent::Stop => Event::Stop, + NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, + NodeEvent::InputClosed { id } => Event::InputClosed { id }, + NodeEvent::Input { id, metadata, data } => { + let data = match data { + None => Ok(None), + Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(DataMessage::SharedMemory { + shared_memory_id, + len, + drop_token, + }) => unsafe { + let (drop_tx, drop_rx) = flume::bounded(0); + let data = MappedInputData::map(&shared_memory_id, len).map(|data| { + Some(RawData::SharedMemory(SharedMemoryData { + data, + _drop: drop_tx, + })) + }); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }); + pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1)); + data + }, + }; + let data = data.and_then(|data| { + let raw_data = data.unwrap_or(RawData::Empty); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); + match data { + Ok(data) => Event::Input { + id, + metadata, + data: data.into(), + }, + Err(err) => Event::Error(format!("{err:?}")), + } + } NodeEvent::AllInputsClosed => { // close the event stream tx = None; // skip this internal event continue; } - _ => None, }; if let Some(tx) = tx.as_ref() { - let (drop_tx, drop_rx) = flume::bounded(0); - match tx.send(EventItem::NodeEvent { - event: inner, - ack_channel: drop_tx, - }) { + match tx.send(EventItem::NodeEvent { event }) { Ok(()) => {} Err(send_error) => { let event = send_error.into_inner(); @@ -152,12 +187,8 @@ fn event_stream_loop( break 'outer Ok(()); } } - - if let Some(token) = drop_token { - pending_drop_tokens.push((token, drop_rx, Instant::now(), 1)); - } } else { - tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); + tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`"); } } }; @@ -189,7 +220,7 @@ fn event_stream_loop( fn handle_pending_drop_tokens( pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, ) -> eyre::Result<()> { let mut still_pending = Vec::new(); for (token, rx, since, warn) in pending_drop_tokens.drain(..) { @@ -197,7 +228,10 @@ fn handle_pending_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::TryRecvError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::TryRecvError::Empty) => { let duration = Duration::from_secs(30 * warn); @@ -214,7 +248,7 @@ fn handle_pending_drop_tokens( fn report_remaining_drop_tokens( mut channel: DaemonChannel, - mut drop_tokens: Vec, + mut drop_tokens: Vec, mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, timestamp: Timestamp, ) -> eyre::Result<()> { @@ -227,7 +261,10 @@ fn report_remaining_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::RecvTimeoutError::Timeout) => { let duration = Duration::from_secs(30); @@ -252,7 +289,7 @@ fn report_remaining_drop_tokens( } fn report_drop_tokens( - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, channel: &mut DaemonChannel, timestamp: Timestamp, ) -> Result<(), eyre::ErrReport> { diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index d62b05887..d30bf40b1 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use dora_message::{ + common::{DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, - node_to_daemon::{DaemonRequest, DropToken, Timestamped}, + node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; pub struct DropStream { - receiver: flume::Receiver, + receiver: flume::Receiver, _thread_handle: DropStreamThreadHandle, } @@ -82,7 +83,7 @@ impl DropStream { } impl std::ops::Deref for DropStream { - type Target = flume::Receiver; + type Target = flume::Receiver; fn deref(&self) -> &Self::Target { &self.receiver @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream { #[tracing::instrument(skip(tx, channel, clock))] fn drop_stream_loop( node_id: NodeId, - tx: flume::Sender, + tx: flume::Sender, mut channel: DaemonChannel, clock: Arc, ) { @@ -125,16 +126,22 @@ fn drop_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - match inner { - NodeDropEvent::OutputDropped { drop_token } => { - if tx.send(drop_token).is_err() { - tracing::warn!( - "drop channel was closed already, could not forward \ - drop token`{drop_token:?}`" - ); - break 'outer; - } - } + let event = match inner { + NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }, + NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }, + }; + if tx.send(event).is_err() { + tracing::warn!( + "drop channel was closed already, could not forward \ + drop token event `{event:?}`" + ); + break 'outer; } } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 234cc8402..905a15adb 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -15,6 +15,7 @@ use dora_core::{ }; use dora_message::{ + common::DropTokenStatus, daemon_to_node::{DaemonReply, NodeConfig}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, @@ -47,6 +48,7 @@ pub struct DoraNode { clock: Arc, sent_out_shared_memory: HashMap, + shared_memory_in_use: HashMap, drop_stream: DropStream, cache: VecDeque, @@ -147,6 +149,7 @@ impl DoraNode { control_channel, clock, sent_out_shared_memory: HashMap::new(), + shared_memory_in_use: HashMap::new(), drop_stream, cache: VecDeque::new(), dataflow_descriptor, @@ -348,10 +351,7 @@ impl DoraNode { fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { loop { match self.drop_stream.try_recv() { - Ok(token) => match self.sent_out_shared_memory.remove(&token) { - Some(region) => self.add_to_cache(region), - None => tracing::warn!("received unknown finished drop token `{token:?}`"), - }, + Ok(event) => self.handle_drop_token_event(event), Err(flume::TryRecvError::Empty) => break, Err(flume::TryRecvError::Disconnected) => { bail!("event stream was closed before sending all expected drop tokens") @@ -361,6 +361,35 @@ impl DoraNode { Ok(()) } + fn handle_drop_token_event(&mut self, event: DropTokenStatus) { + let DropTokenStatus { token, state } = event; + match state { + dora_message::common::DropTokenState::Mapped => { + let region = self.sent_out_shared_memory.remove(&token); + match region { + Some(region) => { + self.shared_memory_in_use.insert(token, region); + } + None => { + tracing::warn!("received unknown mapped drop token `{token:?}`") + } + }; + } + dora_message::common::DropTokenState::Dropped => { + let region = self + .sent_out_shared_memory + .remove(&token) + .or_else(|| self.shared_memory_in_use.remove(&token)); + match region { + Some(region) => self.add_to_cache(region), + None => { + tracing::warn!("received unknown finished drop token `{token:?}`") + } + } + } + }; + } + fn add_to_cache(&mut self, memory: ShmemHandle) { const MAX_CACHE_SIZE: usize = 20; @@ -403,9 +432,7 @@ impl Drop for DoraNode { } match self.drop_stream.recv_timeout(Duration::from_secs(10)) { - Ok(token) => { - self.sent_out_shared_memory.remove(&token); - } + Ok(event) => self.handle_drop_token_event(event), Err(flume::RecvTimeoutError::Disconnected) => { tracing::warn!( "finished_drop_tokens channel closed while still waiting for drop tokens; \ diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 166d31642..da7f0252b 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -8,7 +8,10 @@ use dora_core::{ uhlc::{self, HLC}, }; use dora_message::{ - common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus}, + common::{ + DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, + }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ @@ -887,7 +890,7 @@ impl Daemon { .send_out(dataflow_id, node_id, output_id, metadata, data) .await .context("failed to send out")?, - DaemonNodeEvent::ReportDrop { tokens } => { + DaemonNodeEvent::ReportTokenState { token_events } => { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!( "failed to get handle drop tokens: \ @@ -897,17 +900,27 @@ impl Daemon { match dataflow { Ok(dataflow) => { - for token in tokens { + for event in token_events { + let DropTokenStatus { token, state } = event; match dataflow.pending_drop_tokens.get_mut(&token) { - Some(info) => { - if info.pending_nodes.remove(&node_id) { - dataflow.check_drop_token(token, &self.clock).await?; - } else { - tracing::warn!( - "node `{node_id}` is not pending for drop token `{token:?}`" - ); + Some(info) => match state { + dora_message::common::DropTokenState::Mapped => { + let changed = info.pending_nodes.remove(&node_id); + info.mapped_in_nodes.insert(node_id.clone()); + if changed { + dataflow + .check_drop_token_mapped(token, &self.clock) + .await?; + } + } + dora_message::common::DropTokenState::Dropped => { + let mut changed = info.pending_nodes.remove(&node_id); + changed |= info.mapped_in_nodes.remove(&node_id); + if changed { + dataflow.check_drop_token(token, &self.clock).await?; + } } - } + }, None => tracing::warn!("unknown drop token `{token:?}`"), } } @@ -1344,6 +1357,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }) .pending_nodes .insert(receiver_id.clone()); @@ -1382,6 +1396,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }); // check if all local subscribers are finished with the token dataflow.check_drop_token(token, clock).await?; @@ -1642,7 +1657,7 @@ impl RunningDataflow { async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { match self.pending_drop_tokens.entry(token) { std::collections::hash_map::Entry::Occupied(entry) => { - if entry.get().pending_nodes.is_empty() { + if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() { let (drop_token, info) = entry.remove_entry(); let result = match self.drop_channels.get_mut(&info.owner) { Some(channel) => send_with_timestamp( @@ -1670,6 +1685,38 @@ impl RunningDataflow { Ok(()) } + + async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { + match self.pending_drop_tokens.entry(token) { + std::collections::hash_map::Entry::Occupied(entry) => { + if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() { + let info = entry.get(); + let result = match self.drop_channels.get_mut(&info.owner) { + Some(channel) => send_with_timestamp( + channel, + NodeDropEvent::OutputMapped { drop_token: token }, + clock, + ) + .wrap_err("send failed"), + None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)), + }; + if let Err(err) = result.wrap_err_with(|| { + format!( + "failed to report drop token mapped `{token:?}` to owner `{}`", + &info.owner + ) + }) { + tracing::warn!("{err:?}"); + } + } + } + std::collections::hash_map::Entry::Vacant(_) => { + tracing::warn!("check_drop_token_mapped called with already closed token") + } + } + + Ok(()) + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -1679,9 +1726,14 @@ type InputId = (NodeId, DataId); struct DropTokenInformation { /// The node that created the associated drop token. owner: NodeId, - /// Contains the set of pending nodes that still have access to the input - /// associated with a drop token. + /// Contains the set of nodes that have not mapped the input associated + /// with a drop token yet. The shared memory region needs to be kept + /// alive until this list is empty. pending_nodes: BTreeSet, + /// Contains the set of nodes that still have the input data associated + /// with a drop token mapped in their address space. The shared memory + /// region must not be overwritten until this list is empty. + mapped_in_nodes: BTreeSet, } #[derive(Debug)] @@ -1727,8 +1779,8 @@ pub enum DaemonNodeEvent { metadata: metadata::Metadata, data: Option, }, - ReportDrop { - tokens: Vec, + ReportTokenState { + token_events: Vec, }, EventStreamDropped { reply_sender: oneshot::Sender, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 76a3c6675..6726462e0 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -5,7 +5,7 @@ use dora_core::{ uhlc, }; use dora_message::{ - common::{DropToken, Timestamped}, + common::{DropTokenState, DropTokenStatus, Timestamped}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, node_to_daemon::DaemonRequest, DataflowId, @@ -335,7 +335,10 @@ impl Listener { Some(0) => { dropped += 1; if let Some(drop_token) = data.as_ref().and_then(|d| d.drop_token()) { - drop_tokens.push(drop_token); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }); } *event.as_mut() = None; } @@ -509,13 +512,13 @@ impl Listener { Ok(()) } - async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { + async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { if !drop_tokens.is_empty() { let event = Event::Node { dataflow_id: self.dataflow_id, node_id: self.node_id.clone(), - event: DaemonNodeEvent::ReportDrop { - tokens: drop_tokens, + event: DaemonNodeEvent::ReportTokenState { + token_events: drop_tokens, }, }; let event = Timestamped { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 03a75e88f..7e868b6e9 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -172,6 +172,22 @@ impl fmt::Debug for DataMessage { } } +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropTokenStatus { + pub token: DropToken, + pub state: DropTokenState, +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum DropTokenState { + Mapped, + Dropped, +} + #[derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index 178b33779..c1dda819f 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -73,5 +73,6 @@ pub enum NodeEvent { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum NodeDropEvent { + OutputMapped { drop_token: DropToken }, OutputDropped { drop_token: DropToken }, } diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index 6967c1a3b..2bf4a939f 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -1,7 +1,10 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; -use crate::{current_crate_version, metadata::Metadata, versions_compatible, DataflowId}; +use crate::{ + common::DropTokenStatus, current_crate_version, metadata::Metadata, versions_compatible, + DataflowId, +}; use dora_core::config::{DataId, NodeId}; @@ -19,10 +22,10 @@ pub enum DaemonRequest { /// required drop tokens. OutputsDone, NextEvent { - drop_tokens: Vec, + drop_tokens: Vec, }, ReportDropTokens { - drop_tokens: Vec, + drop_tokens: Vec, }, SubscribeDrop, NextFinishedDropTokens,