From b73b9eed98cf0765fae1ac1cee0374837fd58629 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 4 Oct 2024 13:06:38 +0200 Subject: [PATCH] Report when shared memory region is mapped to allow faster cleanup The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces. By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved. This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`. --- apis/rust/node/src/event_stream/mod.rs | 53 +----------- apis/rust/node/src/event_stream/scheduler.rs | 10 +-- apis/rust/node/src/event_stream/thread.rs | 85 +++++++++++++------ apis/rust/node/src/node/drop_stream.rs | 35 +++++--- apis/rust/node/src/node/mod.rs | 41 +++++++-- binaries/daemon/src/lib.rs | 84 ++++++++++++++---- binaries/daemon/src/node_communication/mod.rs | 8 +- libraries/message/src/common.rs | 16 ++++ libraries/message/src/daemon_to_node.rs | 1 + libraries/message/src/node_to_daemon.rs | 5 +- 10 files changed, 213 insertions(+), 125 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 4856cbb64..86b86ccc1 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -5,7 +5,7 @@ use std::{ }; use dora_message::{ - daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, + daemon_to_node::{DaemonCommunication, DaemonReply}, id::DataId, node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, @@ -18,10 +18,7 @@ use futures::{ use futures_timer::Delay; use scheduler::{Scheduler, NON_INPUT_EVENT}; -use self::{ - event::SharedMemoryData, - thread::{EventItem, EventStreamThreadHandle}, -}; +use self::thread::{EventItem, EventStreamThreadHandle}; use crate::daemon_connection::DaemonChannel; use dora_core::{ config::{Input, NodeId}, @@ -198,51 +195,7 @@ impl EventStream { fn convert_event_item(item: EventItem) -> Event { match item { - EventItem::NodeEvent { event, ack_channel } => match event { - NodeEvent::Stop => Event::Stop, - NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, - NodeEvent::InputClosed { id } => Event::InputClosed { id }, - NodeEvent::Input { id, metadata, data } => { - let data = match data { - None => Ok(None), - Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), - Some(DataMessage::SharedMemory { - shared_memory_id, - len, - drop_token: _, // handled in `event_stream_loop` - }) => unsafe { - MappedInputData::map(&shared_memory_id, len).map(|data| { - Some(RawData::SharedMemory(SharedMemoryData { - data, - _drop: ack_channel, - })) - }) - }, - }; - let data = data.and_then(|data| { - let raw_data = data.unwrap_or(RawData::Empty); - raw_data - .into_arrow_array(&metadata.type_info) - .map(arrow::array::make_array) - }); - match data { - Ok(data) => Event::Input { - id, - metadata, - data: data.into(), - }, - Err(err) => Event::Error(format!("{err:?}")), - } - } - NodeEvent::AllInputsClosed => { - let err = eyre!( - "received `AllInputsClosed` event, which should be handled by background task" - ); - tracing::error!("{err:?}"); - Event::Error(err.wrap_err("internal error").to_string()) - } - }, - + EventItem::NodeEvent { event } => event, EventItem::FatalError(err) => { Event::Error(format!("fatal event stream error: {err:?}")) } diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index c6e15abe2..9185050bb 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; use dora_message::{daemon_to_node::NodeEvent, id::DataId}; -use super::thread::EventItem; +use super::{thread::EventItem, Event}; pub const NON_INPUT_EVENT: &str = "dora/non_input_event"; // This scheduler will make sure that there is fairness between @@ -40,13 +40,7 @@ impl Scheduler { pub fn add_event(&mut self, event: EventItem) { let event_id = match &event { EventItem::NodeEvent { - event: - NodeEvent::Input { - id, - metadata: _, - data: _, - }, - ack_channel: _, + event: Event::Input { id, .. }, } => id, _ => &DataId::from(NON_INPUT_EVENT.to_string()), }; diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index 5e982f749..47aa41e23 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -3,6 +3,7 @@ use dora_core::{ uhlc::{self, Timestamp}, }; use dora_message::{ + common::{DataMessage, DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonReply, NodeEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; @@ -15,6 +16,8 @@ use std::{ use crate::daemon_connection::DaemonChannel; +use super::{event::SharedMemoryData, Event, MappedInputData, RawData}; + pub fn init( node_id: NodeId, tx: flume::Sender, @@ -28,10 +31,7 @@ pub fn init( #[derive(Debug)] pub enum EventItem { - NodeEvent { - event: NodeEvent, - ack_channel: flume::Sender<()>, - }, + NodeEvent { event: super::Event }, FatalError(eyre::Report), TimeoutError(eyre::Report), } @@ -130,25 +130,60 @@ fn event_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - let drop_token = match &inner { - NodeEvent::Input { - data: Some(data), .. - } => data.drop_token(), + + let event = match inner { + NodeEvent::Stop => Event::Stop, + NodeEvent::Reload { operator_id } => Event::Reload { operator_id }, + NodeEvent::InputClosed { id } => Event::InputClosed { id }, + NodeEvent::Input { id, metadata, data } => { + let data = match data { + None => Ok(None), + Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))), + Some(DataMessage::SharedMemory { + shared_memory_id, + len, + drop_token, + }) => unsafe { + let (drop_tx, drop_rx) = flume::bounded(0); + let data = MappedInputData::map(&shared_memory_id, len).map(|data| { + Some(RawData::SharedMemory(SharedMemoryData { + data, + _drop: drop_tx, + })) + }); + drop_tokens.push(DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }); + pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1)); + data + }, + }; + let data = data.and_then(|data| { + let raw_data = data.unwrap_or(RawData::Empty); + raw_data + .into_arrow_array(&metadata.type_info) + .map(arrow::array::make_array) + }); + match data { + Ok(data) => Event::Input { + id, + metadata, + data: data.into(), + }, + Err(err) => Event::Error(format!("{err:?}")), + } + } NodeEvent::AllInputsClosed => { // close the event stream tx = None; // skip this internal event continue; } - _ => None, }; if let Some(tx) = tx.as_ref() { - let (drop_tx, drop_rx) = flume::bounded(0); - match tx.send(EventItem::NodeEvent { - event: inner, - ack_channel: drop_tx, - }) { + match tx.send(EventItem::NodeEvent { event }) { Ok(()) => {} Err(send_error) => { let event = send_error.into_inner(); @@ -159,12 +194,8 @@ fn event_stream_loop( break 'outer Ok(()); } } - - if let Some(token) = drop_token { - pending_drop_tokens.push((token, drop_rx, Instant::now(), 1)); - } } else { - tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`"); + tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`"); } } }; @@ -196,7 +227,7 @@ fn event_stream_loop( fn handle_pending_drop_tokens( pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, ) -> eyre::Result<()> { let mut still_pending = Vec::new(); for (token, rx, since, warn) in pending_drop_tokens.drain(..) { @@ -204,7 +235,10 @@ fn handle_pending_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::TryRecvError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::TryRecvError::Empty) => { let duration = Duration::from_secs(30 * warn); @@ -221,7 +255,7 @@ fn handle_pending_drop_tokens( fn report_remaining_drop_tokens( mut channel: DaemonChannel, - mut drop_tokens: Vec, + mut drop_tokens: Vec, mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>, timestamp: Timestamp, ) -> eyre::Result<()> { @@ -234,7 +268,10 @@ fn report_remaining_drop_tokens( Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")), Err(flume::RecvTimeoutError::Disconnected) => { // the event was dropped -> add the drop token to the list - drop_tokens.push(token); + drop_tokens.push(DropTokenStatus { + token, + state: DropTokenState::Dropped, + }); } Err(flume::RecvTimeoutError::Timeout) => { let duration = Duration::from_secs(1); @@ -259,7 +296,7 @@ fn report_remaining_drop_tokens( } fn report_drop_tokens( - drop_tokens: &mut Vec, + drop_tokens: &mut Vec, channel: &mut DaemonChannel, timestamp: Timestamp, ) -> Result<(), eyre::ErrReport> { diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index d62b05887..d30bf40b1 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use dora_message::{ + common::{DropTokenState, DropTokenStatus}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, - node_to_daemon::{DaemonRequest, DropToken, Timestamped}, + node_to_daemon::{DaemonRequest, Timestamped}, DataflowId, }; use eyre::{eyre, Context}; use flume::RecvTimeoutError; pub struct DropStream { - receiver: flume::Receiver, + receiver: flume::Receiver, _thread_handle: DropStreamThreadHandle, } @@ -82,7 +83,7 @@ impl DropStream { } impl std::ops::Deref for DropStream { - type Target = flume::Receiver; + type Target = flume::Receiver; fn deref(&self) -> &Self::Target { &self.receiver @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream { #[tracing::instrument(skip(tx, channel, clock))] fn drop_stream_loop( node_id: NodeId, - tx: flume::Sender, + tx: flume::Sender, mut channel: DaemonChannel, clock: Arc, ) { @@ -125,16 +126,22 @@ fn drop_stream_loop( if let Err(err) = clock.update_with_timestamp(×tamp) { tracing::warn!("failed to update HLC: {err}"); } - match inner { - NodeDropEvent::OutputDropped { drop_token } => { - if tx.send(drop_token).is_err() { - tracing::warn!( - "drop channel was closed already, could not forward \ - drop token`{drop_token:?}`" - ); - break 'outer; - } - } + let event = match inner { + NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Mapped, + }, + NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus { + token: drop_token, + state: DropTokenState::Dropped, + }, + }; + if tx.send(event).is_err() { + tracing::warn!( + "drop channel was closed already, could not forward \ + drop token event `{event:?}`" + ); + break 'outer; } } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 009e1b81c..ee805d788 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -16,6 +16,7 @@ use dora_core::{ }; use dora_message::{ + common::DropTokenStatus, daemon_to_node::{DaemonReply, NodeConfig}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, @@ -48,6 +49,7 @@ pub struct DoraNode { clock: Arc, sent_out_shared_memory: HashMap, + shared_memory_in_use: HashMap, drop_stream: DropStream, cache: VecDeque, @@ -154,6 +156,7 @@ impl DoraNode { control_channel, clock, sent_out_shared_memory: HashMap::new(), + shared_memory_in_use: HashMap::new(), drop_stream, cache: VecDeque::new(), dataflow_descriptor, @@ -355,10 +358,7 @@ impl DoraNode { fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> { loop { match self.drop_stream.try_recv() { - Ok(token) => match self.sent_out_shared_memory.remove(&token) { - Some(region) => self.add_to_cache(region), - None => tracing::warn!("received unknown finished drop token `{token:?}`"), - }, + Ok(event) => self.handle_drop_token_event(event), Err(flume::TryRecvError::Empty) => break, Err(flume::TryRecvError::Disconnected) => { bail!("event stream was closed before sending all expected drop tokens") @@ -368,6 +368,35 @@ impl DoraNode { Ok(()) } + fn handle_drop_token_event(&mut self, event: DropTokenStatus) { + let DropTokenStatus { token, state } = event; + match state { + dora_message::common::DropTokenState::Mapped => { + let region = self.sent_out_shared_memory.remove(&token); + match region { + Some(region) => { + self.shared_memory_in_use.insert(token, region); + } + None => { + tracing::warn!("received unknown mapped drop token `{token:?}`") + } + }; + } + dora_message::common::DropTokenState::Dropped => { + let region = self + .sent_out_shared_memory + .remove(&token) + .or_else(|| self.shared_memory_in_use.remove(&token)); + match region { + Some(region) => self.add_to_cache(region), + None => { + tracing::warn!("received unknown finished drop token `{token:?}`") + } + } + } + }; + } + fn add_to_cache(&mut self, memory: ShmemHandle) { const MAX_CACHE_SIZE: usize = 20; @@ -410,9 +439,7 @@ impl Drop for DoraNode { } match self.drop_stream.recv_timeout(Duration::from_secs(2)) { - Ok(token) => { - self.sent_out_shared_memory.remove(&token); - } + Ok(event) => self.handle_drop_token_event(event), Err(flume::RecvTimeoutError::Disconnected) => { tracing::warn!( "finished_drop_tokens channel closed while still waiting for drop tokens; \ diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 5c4795a2a..54f4cc630 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -11,7 +11,10 @@ use dora_core::{ uhlc::{self, HLC}, }; use dora_message::{ - common::{DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus}, + common::{ + DataMessage, DropToken, DropTokenStatus, LogLevel, NodeError, NodeErrorCause, + NodeExitStatus, + }, coordinator_to_cli::DataflowResult, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes}, daemon_to_coordinator::{ @@ -927,7 +930,7 @@ impl Daemon { .send_out(dataflow_id, node_id, output_id, metadata, data) .await .context("failed to send out")?, - DaemonNodeEvent::ReportDrop { tokens } => { + DaemonNodeEvent::ReportTokenState { token_events } => { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!( "failed to get handle drop tokens: \ @@ -937,17 +940,27 @@ impl Daemon { match dataflow { Ok(dataflow) => { - for token in tokens { + for event in token_events { + let DropTokenStatus { token, state } = event; match dataflow.pending_drop_tokens.get_mut(&token) { - Some(info) => { - if info.pending_nodes.remove(&node_id) { - dataflow.check_drop_token(token, &self.clock).await?; - } else { - tracing::warn!( - "node `{node_id}` is not pending for drop token `{token:?}`" - ); + Some(info) => match state { + dora_message::common::DropTokenState::Mapped => { + let changed = info.pending_nodes.remove(&node_id); + info.mapped_in_nodes.insert(node_id.clone()); + if changed { + dataflow + .check_drop_token_mapped(token, &self.clock) + .await?; + } + } + dora_message::common::DropTokenState::Dropped => { + let mut changed = info.pending_nodes.remove(&node_id); + changed |= info.mapped_in_nodes.remove(&node_id); + if changed { + dataflow.check_drop_token(token, &self.clock).await?; + } } - } + }, None => tracing::warn!("unknown drop token `{token:?}`"), } } @@ -1385,6 +1398,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }) .pending_nodes .insert(receiver_id.clone()); @@ -1423,6 +1437,7 @@ async fn send_output_to_local_receivers( .or_insert_with(|| DropTokenInformation { owner: node_id.clone(), pending_nodes: Default::default(), + mapped_in_nodes: Default::default(), }); // check if all local subscribers are finished with the token dataflow.check_drop_token(token, clock).await?; @@ -1723,7 +1738,7 @@ impl RunningDataflow { async fn check_drop_token(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { match self.pending_drop_tokens.entry(token) { std::collections::hash_map::Entry::Occupied(entry) => { - if entry.get().pending_nodes.is_empty() { + if entry.get().pending_nodes.is_empty() && entry.get().mapped_in_nodes.is_empty() { let (drop_token, info) = entry.remove_entry(); let result = match self.drop_channels.get_mut(&info.owner) { Some(channel) => send_with_timestamp( @@ -1751,6 +1766,38 @@ impl RunningDataflow { Ok(()) } + + async fn check_drop_token_mapped(&mut self, token: DropToken, clock: &HLC) -> eyre::Result<()> { + match self.pending_drop_tokens.entry(token) { + std::collections::hash_map::Entry::Occupied(entry) => { + if entry.get().pending_nodes.is_empty() && !entry.get().mapped_in_nodes.is_empty() { + let info = entry.get(); + let result = match self.drop_channels.get_mut(&info.owner) { + Some(channel) => send_with_timestamp( + channel, + NodeDropEvent::OutputMapped { drop_token: token }, + clock, + ) + .wrap_err("send failed"), + None => Err(eyre!("no subscribe channel for node `{}`", &info.owner)), + }; + if let Err(err) = result.wrap_err_with(|| { + format!( + "failed to report drop token mapped `{token:?}` to owner `{}`", + &info.owner + ) + }) { + tracing::warn!("{err:?}"); + } + } + } + std::collections::hash_map::Entry::Vacant(_) => { + tracing::warn!("check_drop_token_mapped called with already closed token") + } + } + + Ok(()) + } } fn empty_type_info() -> ArrowTypeInfo { @@ -1772,9 +1819,14 @@ type InputId = (NodeId, DataId); struct DropTokenInformation { /// The node that created the associated drop token. owner: NodeId, - /// Contains the set of pending nodes that still have access to the input - /// associated with a drop token. + /// Contains the set of nodes that have not mapped the input associated + /// with a drop token yet. The shared memory region needs to be kept + /// alive until this list is empty. pending_nodes: BTreeSet, + /// Contains the set of nodes that still have the input data associated + /// with a drop token mapped in their address space. The shared memory + /// region must not be overwritten until this list is empty. + mapped_in_nodes: BTreeSet, } #[derive(Debug)] @@ -1821,8 +1873,8 @@ pub enum DaemonNodeEvent { metadata: metadata::Metadata, data: Option, }, - ReportDrop { - tokens: Vec, + ReportTokenState { + token_events: Vec, }, EventStreamDropped { reply_sender: oneshot::Sender, diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 31b11aa4f..efc7ad441 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -5,7 +5,7 @@ use dora_core::{ uhlc, }; use dora_message::{ - common::{DropToken, Timestamped}, + common::{DropTokenState, DropTokenStatus, Timestamped}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, node_to_daemon::DaemonRequest, DataflowId, @@ -461,13 +461,13 @@ impl Listener { Ok(()) } - async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { + async fn report_drop_tokens(&mut self, drop_tokens: Vec) -> eyre::Result<()> { if !drop_tokens.is_empty() { let event = Event::Node { dataflow_id: self.dataflow_id, node_id: self.node_id.clone(), - event: DaemonNodeEvent::ReportDrop { - tokens: drop_tokens, + event: DaemonNodeEvent::ReportTokenState { + token_events: drop_tokens, }, }; let event = Timestamped { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 9ca2eb823..b06278fe6 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -171,6 +171,22 @@ impl fmt::Debug for DataMessage { } } +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub struct DropTokenStatus { + pub token: DropToken, + pub state: DropTokenState, +} + +#[derive( + Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +pub enum DropTokenState { + Mapped, + Dropped, +} + #[derive( Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index acc1630eb..c270919cc 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -74,5 +74,6 @@ pub enum NodeEvent { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum NodeDropEvent { + OutputMapped { drop_token: DropToken }, OutputDropped { drop_token: DropToken }, } diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index bb5a0850c..ce2a8a1f3 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -2,6 +2,7 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; use crate::{ + common::DropTokenStatus, current_crate_version, id::{DataId, NodeId}, metadata::Metadata, @@ -22,10 +23,10 @@ pub enum DaemonRequest { /// required drop tokens. OutputsDone, NextEvent { - drop_tokens: Vec, + drop_tokens: Vec, }, ReportDropTokens { - drop_tokens: Vec, + drop_tokens: Vec, }, SubscribeDrop, NextFinishedDropTokens,