Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Overseer #1152

Merged
merged 21 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"erasure-coding",
"network",
"network/test",
"overseer",
"primitives",
"runtime/common",
"runtime/polkadot",
Expand Down
16 changes: 16 additions & 0 deletions overseer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "overseer"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]
edition = "2018"

[dependencies]
futures = "0.3.5"
log = "0.4.8"

[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
log = "0.4.8"
kv-log-macro = "1.0.6"
136 changes: 136 additions & 0 deletions overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Shows a basic usage of the `Overseer`:
//! * Spawning subsystems and subsystem child jobs
//! * Establishing message passing

use std::time::Duration;
use futures::{pending, executor};
use futures_timer::Delay;
use kv_log_macro as log;

use overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};

#[derive(Clone, Copy, Debug, Eq, PartialEq, std::hash::Hash)]
pub enum SubsystemId {
Subsystem1,
Subsystem2,
Subsystem3,
}

struct Subsystem1;

impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<usize, SubsystemId>) {
loop {
match ctx.try_recv().await {
montekki marked this conversation as resolved.
Show resolved Hide resolved
Ok(Some(msg)) => {
log::info!("Subsystem1 received message {}", msg);
}
Ok(None) => (),
Err(_) => {}
}

Delay::new(Duration::from_secs(1)).await;
ctx.broadcast_msg(10).await;
}
}

fn new() -> Self {
Self
}
}

impl Subsystem<usize, SubsystemId> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<usize, SubsystemId>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}

struct Subsystem2;

impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<usize, SubsystemId>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
})).await.unwrap();

loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem2 received message {}", msg);
continue;
}
Ok(None) => { pending!(); }
Err(_) => {}
}
}
}

fn new() -> Self {
Self
}
}

impl Subsystem<usize, SubsystemId> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<usize, SubsystemId>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}

struct Subsystem3;

impl Subsystem<usize, SubsystemId> for Subsystem3 {
fn start(&mut self, mut ctx: SubsystemContext<usize, SubsystemId>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// TODO: ctx actually has to be used otherwise the channels are dropped
loop {
// ignore all incoming msgs
while let Ok(Some(_)) = ctx.try_recv().await {
}
log::info!("Subsystem3 tick");
Delay::new(Duration::from_secs(1)).await;

pending!();
}
}))
}

fn can_recv_msg(&self, _msg: &usize) -> bool { false }
}

fn main() {
femme::with_level(femme::LevelFilter::Trace);
let spawner = executor::ThreadPool::new().unwrap();

futures::executor::block_on(async {
let subsystems: Vec<(SubsystemId, Box<dyn Subsystem<usize, SubsystemId> + Send>)> = vec![
(SubsystemId::Subsystem1, Box::new(Subsystem1::new())),
(SubsystemId::Subsystem2, Box::new(Subsystem2::new())),
];

let overseer = Overseer::new(subsystems, spawner);
overseer.run().await;
});
}
Loading