Skip to content

Commit

Permalink
Report when shared memory region is mapped to allow faster cleanup
Browse files Browse the repository at this point in the history
The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces.

By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved.

This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`.
  • Loading branch information
phil-opp committed Oct 4, 2024
1 parent 8c1e81f commit e16c59f
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 119 deletions.
53 changes: 3 additions & 50 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
daemon_to_node::{DaemonCommunication, DaemonReply},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
Expand All @@ -12,10 +12,7 @@ use futures::{
};
use futures_timer::Delay;

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use eyre::{eyre, Context};
Expand Down Expand Up @@ -143,51 +140,7 @@ impl EventStream {

fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
}) => unsafe {
MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: ack_channel,
}))
})
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::NodeEvent { event } => event,
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
Expand Down
85 changes: 61 additions & 24 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dora_core::{
uhlc::{self, Timestamp},
};
use dora_message::{
common::{DataMessage, DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
};
Expand All @@ -15,6 +16,8 @@ use std::{

use crate::daemon_connection::DaemonChannel;

use super::{event::SharedMemoryData, Event, MappedInputData, RawData};

pub fn init(
node_id: NodeId,
tx: flume::Sender<EventItem>,
Expand All @@ -28,10 +31,7 @@ pub fn init(

#[derive(Debug)]
pub enum EventItem {
NodeEvent {
event: NodeEvent,
ack_channel: flume::Sender<()>,
},
NodeEvent { event: super::Event },
FatalError(eyre::Report),
TimeoutError(eyre::Report),
}
Expand Down Expand Up @@ -123,25 +123,60 @@ fn event_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),

let event = match inner {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => unsafe {
let (drop_tx, drop_rx) = flume::bounded(0);
let data = MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: drop_tx,
}))
});
drop_tokens.push(DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
});
pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1));
data
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
}
_ => None,
};

if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event: inner,
ack_channel: drop_tx,
}) {
match tx.send(EventItem::NodeEvent { event }) {
Ok(()) => {}
Err(send_error) => {
let event = send_error.into_inner();
Expand All @@ -152,12 +187,8 @@ fn event_stream_loop(
break 'outer Ok(());
}
}

if let Some(token) = drop_token {
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
}
}
};
Expand Down Expand Up @@ -189,15 +220,18 @@ fn event_stream_loop(

fn handle_pending_drop_tokens(
pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
) -> eyre::Result<()> {
let mut still_pending = Vec::new();
for (token, rx, since, warn) in pending_drop_tokens.drain(..) {
match rx.try_recv() {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::TryRecvError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::TryRecvError::Empty) => {
let duration = Duration::from_secs(30 * warn);
Expand All @@ -214,7 +248,7 @@ fn handle_pending_drop_tokens(

fn report_remaining_drop_tokens(
mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut drop_tokens: Vec<DropTokenStatus>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp,
) -> eyre::Result<()> {
Expand All @@ -227,7 +261,10 @@ fn report_remaining_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(30);
Expand All @@ -252,7 +289,7 @@ fn report_remaining_drop_tokens(
}

fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
channel: &mut DaemonChannel,
timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> {
Expand Down
35 changes: 21 additions & 14 deletions apis/rust/node/src/node/drop_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;

pub struct DropStream {
receiver: flume::Receiver<DropToken>,
receiver: flume::Receiver<DropTokenStatus>,
_thread_handle: DropStreamThreadHandle,
}

Expand Down Expand Up @@ -82,7 +83,7 @@ impl DropStream {
}

impl std::ops::Deref for DropStream {
type Target = flume::Receiver<DropToken>;
type Target = flume::Receiver<DropTokenStatus>;

fn deref(&self) -> &Self::Target {
&self.receiver
Expand All @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream {
#[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop(
node_id: NodeId,
tx: flume::Sender<DropToken>,
tx: flume::Sender<DropTokenStatus>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
Expand Down Expand Up @@ -125,16 +126,22 @@ fn drop_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token`{drop_token:?}`"
);
break 'outer;
}
}
let event = match inner {
NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
},
NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Dropped,
},
};
if tx.send(event).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token event `{event:?}`"
);
break 'outer;
}
}
}
Expand Down
41 changes: 34 additions & 7 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use dora_core::{
};

use dora_message::{
common::DropTokenStatus,
daemon_to_node::{DaemonReply, NodeConfig},
metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct DoraNode {
clock: Arc<uhlc::HLC>,

sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
shared_memory_in_use: HashMap<DropToken, ShmemHandle>,
drop_stream: DropStream,
cache: VecDeque<ShmemHandle>,

Expand Down Expand Up @@ -147,6 +149,7 @@ impl DoraNode {
control_channel,
clock,
sent_out_shared_memory: HashMap::new(),
shared_memory_in_use: HashMap::new(),
drop_stream,
cache: VecDeque::new(),
dataflow_descriptor,
Expand Down Expand Up @@ -348,10 +351,7 @@ impl DoraNode {
fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
loop {
match self.drop_stream.try_recv() {
Ok(token) => match self.sent_out_shared_memory.remove(&token) {
Some(region) => self.add_to_cache(region),
None => tracing::warn!("received unknown finished drop token `{token:?}`"),
},
Ok(event) => self.handle_drop_token_event(event),
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => {
bail!("event stream was closed before sending all expected drop tokens")
Expand All @@ -361,6 +361,35 @@ impl DoraNode {
Ok(())
}

fn handle_drop_token_event(&mut self, event: DropTokenStatus) {
let DropTokenStatus { token, state } = event;
match state {
dora_message::common::DropTokenState::Mapped => {
let region = self.sent_out_shared_memory.remove(&token);
match region {
Some(region) => {
self.shared_memory_in_use.insert(token, region);
}
None => {
tracing::warn!("received unknown mapped drop token `{token:?}`")
}
};
}
dora_message::common::DropTokenState::Dropped => {
let region = self
.sent_out_shared_memory
.remove(&token)
.or_else(|| self.shared_memory_in_use.remove(&token));
match region {
Some(region) => self.add_to_cache(region),
None => {
tracing::warn!("received unknown finished drop token `{token:?}`")
}
}
}
};
}

fn add_to_cache(&mut self, memory: ShmemHandle) {
const MAX_CACHE_SIZE: usize = 20;

Expand Down Expand Up @@ -403,9 +432,7 @@ impl Drop for DoraNode {
}

match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Ok(event) => self.handle_drop_token_event(event),
Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!(
"finished_drop_tokens channel closed while still waiting for drop tokens; \
Expand Down
Loading

0 comments on commit e16c59f

Please sign in to comment.