Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add NodeSpawner and NetworkSpawner #2642

Merged
merged 19 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use std::{
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::Duration;
use tracing::warn;
use xor_name::XorName;
Expand Down Expand Up @@ -834,7 +834,7 @@ impl SwarmDriver {
/// The `tokio::select` macro is used to concurrently process swarm events
/// and command receiver messages, ensuring efficient handling of multiple
/// asynchronous tasks.
pub async fn run(mut self) {
pub async fn run(mut self, mut shutdown_rx: watch::Receiver<bool>) {
let mut network_discover_interval = interval(NETWORK_DISCOVER_INTERVAL);
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
Expand Down Expand Up @@ -889,6 +889,13 @@ impl SwarmDriver {
},
None => continue,
},
// Check for a shutdown command.
result = shutdown_rx.changed() => {
if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
info!("Shutdown signal received or sender dropped. Exiting swarm driver loop.");
break;
}
},
// next take and react to external swarm events
swarm_event = self.swarm.select_next_some() => {
// Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
Expand Down
4 changes: 2 additions & 2 deletions ant-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ upnp = ["ant-networking/upnp"]
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.4" }
ant-build-info = { path = "../ant-build-info", version = "0.1.23" }
ant-evm = { path = "../ant-evm", version = "0.1.8" }
ant-logging = { path = "../ant-logging", version = "0.2.45", features = [ "process-metrics" ] }
ant-logging = { path = "../ant-logging", version = "0.2.45", features = ["process-metrics"] }
ant-networking = { path = "../ant-networking", version = "0.3.3" }
ant-protocol = { path = "../ant-protocol", version = "0.3.3" }
ant-service-management = { path = "../ant-service-management", version = "0.4.7" }
Expand Down Expand Up @@ -73,7 +73,7 @@ tonic = { version = "0.6.2" }
tracing = { version = "~0.1.26" }
tracing-appender = "~0.2.0"
tracing-opentelemetry = { version = "0.21", optional = true }
tracing-subscriber = { version = "0.3.16" }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
walkdir = "~2.5.0"
xor_name = "5.0.0"

Expand Down
41 changes: 41 additions & 0 deletions ant-node/examples/spawn_local_network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2025 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use ant_node::spawn::network_spawner::NetworkSpawner;
use std::time::Duration;
use tokio::time::sleep;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};

#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_env("RUST_LOG"))
.init();

let network_size = 20;

let running_network = NetworkSpawner::new()
.with_evm_network(Default::default())
.with_local(true)
.with_size(network_size)
.spawn()
.await
.expect("Failed to spawn network");

// Wait for nodes to dial each other
sleep(Duration::from_secs(10)).await;

for node in running_network.running_nodes() {
println!("Node listening on: {:?}", node.get_listen_addrs().await);
}

running_network.shutdown();
}
81 changes: 3 additions & 78 deletions ant-node/src/bin/antnode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ant_bootstrap::{BootstrapCacheStore, PeersArgs};
use ant_evm::{get_evm_network, EvmNetwork, RewardsAddress};
use ant_logging::metrics::init_metrics;
use ant_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use ant_node::utils::get_root_dir_and_keypair;
use ant_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use ant_protocol::{
node::get_antnode_root_dir,
Expand All @@ -26,12 +27,11 @@ use ant_protocol::{
use clap::{command, Parser};
use color_eyre::{eyre::eyre, Result};
use const_hex::traits::FromHex;
use libp2p::{identity::Keypair, PeerId};
use libp2p::PeerId;
use std::{
env,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
path::PathBuf,
process::Command,
time::Duration,
};
Expand Down Expand Up @@ -571,81 +571,6 @@ fn init_logging(opt: &Opt, peer_id: PeerId) -> Result<(String, ReloadHandle, Opt
Ok((output_dest.to_string(), reload_handle, log_appender_guard))
}

fn create_secret_key_file(path: impl AsRef<Path>) -> Result<std::fs::File, std::io::Error> {
let mut opt = std::fs::OpenOptions::new();
opt.write(true).create_new(true);

// On Unix systems, make sure only the current user can read/write.
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opt.mode(0o600);
}

opt.open(path)
}

fn keypair_from_path(path: impl AsRef<Path>) -> Result<Keypair> {
let keypair = match std::fs::read(&path) {
// If the file is opened successfully, read the key from it
Ok(key) => {
let keypair = Keypair::ed25519_from_bytes(key)
.map_err(|err| eyre!("could not read ed25519 key from file: {err}"))?;

info!("loaded secret key from file: {:?}", path.as_ref());

keypair
}
// In case the file is not found, generate a new keypair and write it to the file
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
let secret_key = libp2p::identity::ed25519::SecretKey::generate();
let mut file = create_secret_key_file(&path)
.map_err(|err| eyre!("could not create secret key file: {err}"))?;
file.write_all(secret_key.as_ref())?;

info!("generated new key and stored to file: {:?}", path.as_ref());

libp2p::identity::ed25519::Keypair::from(secret_key).into()
}
// Else the file can't be opened, for whatever reason (e.g. permissions).
Err(err) => {
return Err(eyre!("failed to read secret key file: {err}"));
}
};

Ok(keypair)
}

/// The keypair is located inside the root directory. At the same time, when no dir is specified,
/// the dir name is derived from the keypair used in the application: the peer ID is used as the directory name.
fn get_root_dir_and_keypair(root_dir: &Option<PathBuf>) -> Result<(PathBuf, Keypair)> {
match root_dir {
Some(dir) => {
std::fs::create_dir_all(dir)?;

let secret_key_path = dir.join("secret-key");
Ok((dir.clone(), keypair_from_path(secret_key_path)?))
}
None => {
let secret_key = libp2p::identity::ed25519::SecretKey::generate();
let keypair: Keypair =
libp2p::identity::ed25519::Keypair::from(secret_key.clone()).into();
let peer_id = keypair.public().to_peer_id();

let dir = get_antnode_root_dir(peer_id)?;
std::fs::create_dir_all(&dir)?;

let secret_key_path = dir.join("secret-key");

let mut file = create_secret_key_file(secret_key_path)
.map_err(|err| eyre!("could not create secret key file: {err}"))?;
file.write_all(secret_key.as_ref())?;

Ok((dir, keypair))
}
}
}

/// Starts a new process running the binary with the same args as
/// the current process
/// Optionally provide the node's root dir and listen port to retain it's PeerId
Expand Down
36 changes: 33 additions & 3 deletions ant-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ mod put_validation;
mod python;
mod quote;
mod replication;
#[allow(missing_docs)]
pub mod spawn;
#[allow(missing_docs)]
pub mod utils;

pub use self::{
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
Expand All @@ -41,20 +45,22 @@ pub use self::{

use crate::error::{Error, Result};

use ant_evm::RewardsAddress;
use ant_networking::{Network, SwarmLocalState};
use ant_protocol::{get_port_from_multiaddr, NetworkAddress};
use libp2p::PeerId;
use libp2p::{Multiaddr, PeerId};

use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
};

use ant_evm::RewardsAddress;
use tokio::sync::watch;

/// Once a node is started and running, the user obtains
/// a `NodeRunning` object which can be used to interact with it.
#[derive(Clone)]
pub struct RunningNode {
shutdown_sender: watch::Sender<bool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We send TerminateNode(String) via the NodeEventsChannel below, can this be re-used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be reused for the node (NodeEvents) loop, but for the SwarmDriver loop it wouldn't make sense, as the broadcast channel uses ant_node::event::NodeEvent, which is inaccessible from the ant-networking crate in which the SwarmDriver loop resides.

network: Network,
node_events_channel: NodeEventsChannel,
root_dir_path: PathBuf,
Expand Down Expand Up @@ -85,6 +91,24 @@ impl RunningNode {
Ok(state)
}

/// Return the node's listening addresses.
pub async fn get_listen_addrs(&self) -> Result<Vec<Multiaddr>> {
let listeners = self.network.get_swarm_local_state().await?.listeners;
Ok(listeners)
}

/// Return the node's listening addresses with the peer id appended.
pub async fn get_listen_addrs_with_peer_id(&self) -> Result<Vec<Multiaddr>> {
let listeners = self.get_listen_addrs().await?;

let multi_addrs: Vec<Multiaddr> = listeners
.into_iter()
.filter_map(|listen_addr| listen_addr.with_p2p(self.peer_id()).ok())
.collect();

Ok(multi_addrs)
}

/// Return the node's listening port
pub async fn get_node_listening_port(&self) -> Result<u16> {
let listen_addrs = self.network.get_swarm_local_state().await?.listeners;
Expand Down Expand Up @@ -125,4 +149,10 @@ impl RunningNode {
pub fn reward_address(&self) -> &RewardsAddress {
&self.rewards_address
}

/// Shutdown the SwarmDriver loop and the node (NetworkEvents) loop.
pub fn shutdown(self) {
// Send the shutdown signal to the swarm driver and node loop
let _ = self.shutdown_sender.send(true);
}
}
38 changes: 29 additions & 9 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::metrics::NodeMetricsRecorder;
use crate::RunningNode;
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::RewardsAddress;
use ant_evm::{EvmNetwork, U256};
#[cfg(feature = "open-metrics")]
use ant_networking::MetricsRegistries;
use ant_networking::{
Expand Down Expand Up @@ -44,13 +45,12 @@ use std::{
},
time::Duration,
};
use tokio::sync::watch;
use tokio::{
sync::mpsc::Receiver,
task::{spawn, JoinSet},
};

use ant_evm::{EvmNetwork, U256};

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any node will be half this
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
Expand Down Expand Up @@ -189,6 +189,7 @@ impl NodeBuilder {

let (network, network_event_receiver, swarm_driver) =
network_builder.build_node(self.root_dir.clone())?;

let node_events_channel = NodeEventsChannel::default();

let node = NodeInner {
Expand All @@ -200,19 +201,25 @@ impl NodeBuilder {
metrics_recorder,
evm_network: self.evm_network,
};

let node = Node {
inner: Arc::new(node),
};

// Create a shutdown signal channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);

// Run the node
node.run(swarm_driver, network_event_receiver, shutdown_rx);

let running_node = RunningNode {
shutdown_sender: shutdown_tx,
network,
node_events_channel,
root_dir_path: self.root_dir,
rewards_address: self.evm_address,
};

// Run the node
node.run(swarm_driver, network_event_receiver);

Ok(running_node)
}
}
Expand Down Expand Up @@ -270,14 +277,20 @@ impl Node {
&self.inner.evm_network
}

/// Runs the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`
fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver<NetworkEvent>) {
/// Runs a task for the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`.
/// Returns both tasks as JoinHandle<()>.
fn run(
self,
swarm_driver: SwarmDriver,
mut network_event_receiver: Receiver<NetworkEvent>,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut rng = StdRng::from_entropy();

let peers_connected = Arc::new(AtomicUsize::new(0));

let _handle = spawn(swarm_driver.run());
let _handle = spawn(async move {
let _swarm_driver_task = spawn(swarm_driver.run(shutdown_rx.clone()));
let _node_task = spawn(async move {
// use a random inactivity timeout to ensure that the nodes do not sync when messages
// are being transmitted.
let replication_interval: u64 = rng.gen_range(
Expand Down Expand Up @@ -328,6 +341,13 @@ impl Node {
let peers_connected = &peers_connected;

tokio::select! {
// Check for a shutdown command.
result = shutdown_rx.changed() => {
if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
info!("Shutdown signal received or sender dropped. Exiting network events loop.");
break;
}
},
net_event = network_event_receiver.recv() => {
match net_event {
Some(event) => {
Expand Down
10 changes: 10 additions & 0 deletions ant-node/src/spawn/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2025 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

pub mod network_spawner;
pub mod node_spawner;
Loading
Loading