Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Overseer #1152

Merged
merged 21 commits into from
Jun 2, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,35 @@ enum OverseerMessage<M: Debug, I> {
},
}

enum Event {
BlockImport,
BlockFinalized,
Stop,
}

/// A handler used to communicate with the `Overseer`.
#[derive(Clone)]
pub struct OverseerHandler {
events_tx: mpsc::Sender<Event>,
}

impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self) -> SubsystemResult<()> {
montekki marked this conversation as resolved.
Show resolved Hide resolved
self.events_tx.send(Event::BlockImport).await.map_err(|_| SubsystemError)
}

/// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized).await.map_err(|_| SubsystemError)
}

/// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await.map_err(|_| SubsystemError)
}
}
montekki marked this conversation as resolved.
Show resolved Hide resolved

impl<M: Debug, I: Debug> Debug for OverseerMessage<M, I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -240,6 +269,12 @@ pub struct Overseer<M: Debug, S: Spawn, I> {

/// The capacity of bounded channels created between `Overseer` and `SpawnedSubsystem`s.
channel_capacity: usize,

/// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>,

/// A sender for the `events_rx`, used to return `OverseerHandler` to the user.
events_tx: mpsc::Sender<Event>,
}

impl<M, S, I> Overseer<M, S, I>
Expand Down Expand Up @@ -274,11 +309,14 @@ where
///
/// ```
pub fn new<T: IntoIterator<Item = (I, Box<dyn Subsystem<M, I> + Send>)>>(subsystems: T, s: S) -> Self {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
let mut this = Self {
subsystems: HashMap::new(),
s,
running_subsystems: FuturesUnordered::new(),
channel_capacity: CHANNEL_CAPACITY,
events_rx,
events_tx,
};

for s in subsystems.into_iter() {
Expand All @@ -288,6 +326,14 @@ where
this
}

pub fn handler(&mut self) -> OverseerHandler {
let events_tx = self.events_tx.clone();

OverseerHandler {
events_tx,
}
}

/// Run the `Overseer`.
// TODO: we have to
// * Give out to the user some handler to communicate with the `Overseer`
Expand All @@ -299,6 +345,15 @@ where
// that need dispatching (if any).
let mut msgs = Vec::default();

while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
if let Event::Stop = msg {
// TODO: We should send stop messages to all subsystems, join them
// and wait for some timeout for them to gracefully shutdown and then
// just drop their handlers.
montekki marked this conversation as resolved.
Show resolved Hide resolved
return
}
}

for (id, s) in self.subsystems.iter_mut() {
if let Some(s) = &mut s.instance {
while let Poll::Ready(Some(msg)) = poll!(&mut s.rx.next()) {
Expand Down Expand Up @@ -520,7 +575,8 @@ mod tests {
(SubsystemId::Subsystem1, Box::new(TestSubsystem1(s1_tx))),
(SubsystemId::Subsystem2, Box::new(TestSubsystem2(s2_tx))),
];
let overseer = Overseer::new(subsystems, spawner);
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);
Expand All @@ -536,7 +592,7 @@ mod tests {
Some(msg) => {
s1_results.push(msg);
if s1_results.len() == 10 {
break;
handler.stop().await.unwrap();
}
}
None => break,
Expand Down Expand Up @@ -566,7 +622,8 @@ mod tests {
let subsystems: Vec<(SubsystemId, Box<dyn Subsystem<usize, SubsystemId> + Send>)> = vec![
(SubsystemId::Subsystem3, Box::new(TestSubsystem3(Some(tx)))),
];
let overseer = Overseer::new(subsystems, spawner);
let mut overseer = Overseer::new(subsystems, spawner);
let mut handler = overseer.handler();
let overseer_fut = overseer.run().fuse();

let mut rx = rx.fuse();
Expand All @@ -577,7 +634,7 @@ mod tests {
a = overseer_fut => break,
result = rx => {
assert_eq!(result.unwrap(), 10);
break;
handler.stop().await.unwrap();
}
}
}
Expand Down