Skip to content

Commit

Permalink
Merge pull request #2642 from mickvandijke/feat-node-from-code
Browse files Browse the repository at this point in the history
feat: add `NodeSpawner` and `NetworkSpawner`
  • Loading branch information
mickvandijke authored Jan 23, 2025
2 parents 0bc31f6 + 3222006 commit 8ad5a4f
Show file tree
Hide file tree
Showing 11 changed files with 708 additions and 99 deletions.
11 changes: 9 additions & 2 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use std::{
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::Duration;
use tracing::warn;
use xor_name::XorName;
Expand Down Expand Up @@ -813,7 +813,7 @@ impl SwarmDriver {
/// The `tokio::select` macro is used to concurrently process swarm events
/// and command receiver messages, ensuring efficient handling of multiple
/// asynchronous tasks.
pub async fn run(mut self) {
pub async fn run(mut self, mut shutdown_rx: watch::Receiver<bool>) {
let mut network_discover_interval = interval(NETWORK_DISCOVER_INTERVAL);
let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
Expand Down Expand Up @@ -868,6 +868,13 @@ impl SwarmDriver {
},
None => continue,
},
// Check for a shutdown command.
result = shutdown_rx.changed() => {
if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
info!("Shutdown signal received or sender dropped. Exiting swarm driver loop.");
break;
}
},
// next take and react to external swarm events
swarm_event = self.swarm.select_next_some() => {
// Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
Expand Down
4 changes: 2 additions & 2 deletions ant-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ otlp = ["ant-logging/otlp"]
ant-bootstrap = { path = "../ant-bootstrap", version = "0.1.4" }
ant-build-info = { path = "../ant-build-info", version = "0.1.23" }
ant-evm = { path = "../ant-evm", version = "0.1.8" }
ant-logging = { path = "../ant-logging", version = "0.2.45", features = [ "process-metrics" ] }
ant-logging = { path = "../ant-logging", version = "0.2.45", features = ["process-metrics"] }
ant-networking = { path = "../ant-networking", version = "0.3.3" }
ant-protocol = { path = "../ant-protocol", version = "0.3.3" }
ant-service-management = { path = "../ant-service-management", version = "0.4.7" }
Expand Down Expand Up @@ -72,7 +72,7 @@ tonic = { version = "0.6.2" }
tracing = { version = "~0.1.26" }
tracing-appender = "~0.2.0"
tracing-opentelemetry = { version = "0.21", optional = true }
tracing-subscriber = { version = "0.3.16" }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
walkdir = "~2.5.0"
xor_name = "5.0.0"

Expand Down
41 changes: 41 additions & 0 deletions ant-node/examples/spawn_local_network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2025 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use ant_node::spawn::network_spawner::NetworkSpawner;
use std::time::Duration;
use tokio::time::sleep;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};

#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_env("RUST_LOG"))
.init();

let network_size = 20;

let running_network = NetworkSpawner::new()
.with_evm_network(Default::default())
.with_local(true)
.with_size(network_size)
.spawn()
.await
.expect("Failed to spawn network");

// Wait for nodes to dial each other
sleep(Duration::from_secs(10)).await;

for node in running_network.running_nodes() {
println!("Node listening on: {:?}", node.get_listen_addrs().await);
}

running_network.shutdown();
}
81 changes: 3 additions & 78 deletions ant-node/src/bin/antnode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ant_bootstrap::{BootstrapCacheStore, PeersArgs};
use ant_evm::{get_evm_network, EvmNetwork, RewardsAddress};
use ant_logging::metrics::init_metrics;
use ant_logging::{Level, LogFormat, LogOutputDest, ReloadHandle};
use ant_node::utils::get_root_dir_and_keypair;
use ant_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver};
use ant_protocol::{
node::get_antnode_root_dir,
Expand All @@ -26,12 +27,11 @@ use ant_protocol::{
use clap::{command, Parser};
use color_eyre::{eyre::eyre, Result};
use const_hex::traits::FromHex;
use libp2p::{identity::Keypair, PeerId};
use libp2p::PeerId;
use std::{
env,
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
path::PathBuf,
process::Command,
time::Duration,
};
Expand Down Expand Up @@ -569,81 +569,6 @@ fn init_logging(opt: &Opt, peer_id: PeerId) -> Result<(String, ReloadHandle, Opt
Ok((output_dest.to_string(), reload_handle, log_appender_guard))
}

fn create_secret_key_file(path: impl AsRef<Path>) -> Result<std::fs::File, std::io::Error> {
let mut opt = std::fs::OpenOptions::new();
opt.write(true).create_new(true);

// On Unix systems, make sure only the current user can read/write.
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opt.mode(0o600);
}

opt.open(path)
}

fn keypair_from_path(path: impl AsRef<Path>) -> Result<Keypair> {
let keypair = match std::fs::read(&path) {
// If the file is opened successfully, read the key from it
Ok(key) => {
let keypair = Keypair::ed25519_from_bytes(key)
.map_err(|err| eyre!("could not read ed25519 key from file: {err}"))?;

info!("loaded secret key from file: {:?}", path.as_ref());

keypair
}
// In case the file is not found, generate a new keypair and write it to the file
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
let secret_key = libp2p::identity::ed25519::SecretKey::generate();
let mut file = create_secret_key_file(&path)
.map_err(|err| eyre!("could not create secret key file: {err}"))?;
file.write_all(secret_key.as_ref())?;

info!("generated new key and stored to file: {:?}", path.as_ref());

libp2p::identity::ed25519::Keypair::from(secret_key).into()
}
// Else the file can't be opened, for whatever reason (e.g. permissions).
Err(err) => {
return Err(eyre!("failed to read secret key file: {err}"));
}
};

Ok(keypair)
}

/// The keypair is located inside the root directory. At the same time, when no dir is specified,
/// the dir name is derived from the keypair used in the application: the peer ID is used as the directory name.
fn get_root_dir_and_keypair(root_dir: &Option<PathBuf>) -> Result<(PathBuf, Keypair)> {
match root_dir {
Some(dir) => {
std::fs::create_dir_all(dir)?;

let secret_key_path = dir.join("secret-key");
Ok((dir.clone(), keypair_from_path(secret_key_path)?))
}
None => {
let secret_key = libp2p::identity::ed25519::SecretKey::generate();
let keypair: Keypair =
libp2p::identity::ed25519::Keypair::from(secret_key.clone()).into();
let peer_id = keypair.public().to_peer_id();

let dir = get_antnode_root_dir(peer_id)?;
std::fs::create_dir_all(&dir)?;

let secret_key_path = dir.join("secret-key");

let mut file = create_secret_key_file(secret_key_path)
.map_err(|err| eyre!("could not create secret key file: {err}"))?;
file.write_all(secret_key.as_ref())?;

Ok((dir, keypair))
}
}
}

/// Starts a new process running the binary with the same args as
/// the current process
/// Optionally provide the node's root dir and listen port to retain it's PeerId
Expand Down
36 changes: 33 additions & 3 deletions ant-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ mod put_validation;
mod python;
mod quote;
mod replication;
#[allow(missing_docs)]
pub mod spawn;
#[allow(missing_docs)]
pub mod utils;

pub use self::{
event::{NodeEvent, NodeEventsChannel, NodeEventsReceiver},
Expand All @@ -41,20 +45,22 @@ pub use self::{

use crate::error::{Error, Result};

use ant_evm::RewardsAddress;
use ant_networking::{Network, SwarmLocalState};
use ant_protocol::{get_port_from_multiaddr, NetworkAddress};
use libp2p::PeerId;
use libp2p::{Multiaddr, PeerId};

use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
};

use ant_evm::RewardsAddress;
use tokio::sync::watch;

/// Once a node is started and running, the user obtains
/// a `NodeRunning` object which can be used to interact with it.
#[derive(Clone)]
pub struct RunningNode {
shutdown_sender: watch::Sender<bool>,
network: Network,
node_events_channel: NodeEventsChannel,
root_dir_path: PathBuf,
Expand Down Expand Up @@ -85,6 +91,24 @@ impl RunningNode {
Ok(state)
}

/// Return the node's listening addresses.
pub async fn get_listen_addrs(&self) -> Result<Vec<Multiaddr>> {
let listeners = self.network.get_swarm_local_state().await?.listeners;
Ok(listeners)
}

/// Return the node's listening addresses with the peer id appended.
pub async fn get_listen_addrs_with_peer_id(&self) -> Result<Vec<Multiaddr>> {
let listeners = self.get_listen_addrs().await?;

let multi_addrs: Vec<Multiaddr> = listeners
.into_iter()
.filter_map(|listen_addr| listen_addr.with_p2p(self.peer_id()).ok())
.collect();

Ok(multi_addrs)
}

/// Return the node's listening port
pub async fn get_node_listening_port(&self) -> Result<u16> {
let listen_addrs = self.network.get_swarm_local_state().await?.listeners;
Expand Down Expand Up @@ -125,4 +149,10 @@ impl RunningNode {
pub fn reward_address(&self) -> &RewardsAddress {
&self.rewards_address
}

/// Shutdown the SwarmDriver loop and the node (NetworkEvents) loop.
pub fn shutdown(self) {
// Send the shutdown signal to the swarm driver and node loop
let _ = self.shutdown_sender.send(true);
}
}
38 changes: 29 additions & 9 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::metrics::NodeMetricsRecorder;
use crate::RunningNode;
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::RewardsAddress;
use ant_evm::{EvmNetwork, U256};
#[cfg(feature = "open-metrics")]
use ant_networking::MetricsRegistries;
use ant_networking::{
Expand Down Expand Up @@ -44,13 +45,12 @@ use std::{
},
time::Duration,
};
use tokio::sync::watch;
use tokio::{
sync::mpsc::Receiver,
task::{spawn, JoinSet},
};

use ant_evm::{EvmNetwork, U256};

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any node will be half this
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;
Expand Down Expand Up @@ -186,6 +186,7 @@ impl NodeBuilder {

let (network, network_event_receiver, swarm_driver) =
network_builder.build_node(self.root_dir.clone())?;

let node_events_channel = NodeEventsChannel::default();

let node = NodeInner {
Expand All @@ -197,19 +198,25 @@ impl NodeBuilder {
metrics_recorder,
evm_network: self.evm_network,
};

let node = Node {
inner: Arc::new(node),
};

// Create a shutdown signal channel
let (shutdown_tx, shutdown_rx) = watch::channel(false);

// Run the node
node.run(swarm_driver, network_event_receiver, shutdown_rx);

let running_node = RunningNode {
shutdown_sender: shutdown_tx,
network,
node_events_channel,
root_dir_path: self.root_dir,
rewards_address: self.evm_address,
};

// Run the node
node.run(swarm_driver, network_event_receiver);

Ok(running_node)
}
}
Expand Down Expand Up @@ -267,14 +274,20 @@ impl Node {
&self.inner.evm_network
}

/// Runs the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`
fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver<NetworkEvent>) {
/// Runs a task for the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`.
/// Returns both tasks as JoinHandle<()>.
fn run(
self,
swarm_driver: SwarmDriver,
mut network_event_receiver: Receiver<NetworkEvent>,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut rng = StdRng::from_entropy();

let peers_connected = Arc::new(AtomicUsize::new(0));

let _handle = spawn(swarm_driver.run());
let _handle = spawn(async move {
let _swarm_driver_task = spawn(swarm_driver.run(shutdown_rx.clone()));
let _node_task = spawn(async move {
// use a random inactivity timeout to ensure that the nodes do not sync when messages
// are being transmitted.
let replication_interval: u64 = rng.gen_range(
Expand Down Expand Up @@ -325,6 +338,13 @@ impl Node {
let peers_connected = &peers_connected;

tokio::select! {
// Check for a shutdown command.
result = shutdown_rx.changed() => {
if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
info!("Shutdown signal received or sender dropped. Exiting network events loop.");
break;
}
},
net_event = network_event_receiver.recv() => {
match net_event {
Some(event) => {
Expand Down
10 changes: 10 additions & 0 deletions ant-node/src/spawn/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2025 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

pub mod network_spawner;
pub mod node_spawner;
Loading

0 comments on commit 8ad5a4f

Please sign in to comment.