Skip to content

Commit

Permalink
docs(comms): adds documentation for comms public interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Apr 13, 2022
1 parent f46d2f0 commit 7b7ca29
Show file tree
Hide file tree
Showing 68 changed files with 373 additions and 196 deletions.
3 changes: 3 additions & 0 deletions comms/core/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::{cmp::min, time::Duration};

/// Boxed backoff
pub type BoxedBackoff = Box<dyn Backoff + Send + Sync>;

pub trait Backoff {
Expand All @@ -34,6 +35,7 @@ impl Backoff for BoxedBackoff {
}
}

/// Returns a backoff Duration that increases exponentially to the number of attempts.
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
factor: f32,
Expand Down Expand Up @@ -61,6 +63,7 @@ impl Backoff for ExponentialBackoff {
}
}

/// Returns a backoff Duration that increases linearly to the number of attempts.
#[derive(Clone)]
pub struct ConstantBackoff(Duration);

Expand Down
9 changes: 9 additions & 0 deletions comms/core/src/bounded_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,13 @@ impl BoundedExecutor {
}
}

/// A task executor that can be configured to be bounded or unbounded.
pub struct OptionallyBoundedExecutor {
inner: Either<runtime::Handle, BoundedExecutor>,
}

impl OptionallyBoundedExecutor {
/// Create a new OptionallyBoundedExecutor. If `num_permits` is `None` the executor will be unbounded.
pub fn new(executor: runtime::Handle, num_permits: Option<usize>) -> Self {
Self {
inner: num_permits
Expand All @@ -187,17 +189,22 @@ impl OptionallyBoundedExecutor {
}
}

/// Create a new OptionallyBoundedExecutor from the current tokio context. If `num_permits` is `None` the executor
/// will be unbounded.
pub fn from_current(num_permits: Option<usize>) -> Self {
Self::new(current(), num_permits)
}

/// Returns true if this executor can spawn, otherwise false.
pub fn can_spawn(&self) -> bool {
match &self.inner {
Either::Left(_) => true,
Either::Right(exec) => exec.can_spawn(),
}
}

/// Try spawn a new task returning its `JoinHandle`. An error is returned if the executor is bounded and currently
/// full.
pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, TrySpawnError>
where
F: Future + Send + 'static,
Expand All @@ -209,6 +216,8 @@ impl OptionallyBoundedExecutor {
}
}

/// Spawns a new task returning its `JoinHandle`. If the executor is running `num_permits` tasks, this waits until a
/// task is available.
pub async fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand Down
9 changes: 7 additions & 2 deletions comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,21 @@ impl UnspawnedCommsNode {
self
}

/// Adds [ProtocolExtensions](crate::protocol::ProtocolExtensions) to this node.
pub fn add_protocol_extensions(mut self, extensions: ProtocolExtensions) -> Self {
self.protocol_extensions.extend(extensions);
self
}

/// Add a protocol extension
/// Adds an implementation of [ProtocolExtension](crate::protocol::ProtocolExtension) to this node.
/// This is used to add custom protocols to Tari comms.
pub fn add_protocol_extension<T: ProtocolExtension + 'static>(mut self, extension: T) -> Self {
self.protocol_extensions.add(extension);
self
}

/// Registers custom ProtocolIds and mpsc notifier. A [ProtocolNotification](crate::protocol::ProtocolNotification)
/// will be sent on that channel whenever a remote peer requests to speak the given protocols.
pub fn add_protocol<I: AsRef<[ProtocolId]>>(
mut self,
protocol: I,
Expand All @@ -109,7 +113,7 @@ impl UnspawnedCommsNode {
self
}

/// Set the listener address
/// Set the listener address. This is an alias to `CommsBuilder::with_listener_address`.
pub fn with_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.builder = self.builder.with_listener_address(listener_address);
self
Expand All @@ -121,6 +125,7 @@ impl UnspawnedCommsNode {
self
}

/// Spawn a new node using the specified [Transport](crate::transports::Transport).
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
Expand Down
86 changes: 74 additions & 12 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! # CommsBuilder
//!
//! The [CommsBuilder] provides a simple builder API for getting Tari comms p2p messaging up and running.
//!
//! [CommsBuilder]: ./builder/struct.CommsBuilder.html

mod comms_node;
pub use comms_node::{CommsNode, UnspawnedCommsNode};

Expand All @@ -41,13 +35,13 @@ mod placeholder;
#[cfg(test)]
mod tests;

use std::{fs::File, sync::Arc};
use std::{fs::File, sync::Arc, time::Duration};

use tari_shutdown::ShutdownSignal;
use tokio::sync::{broadcast, mpsc};

use crate::{
backoff::{Backoff, BoxedBackoff, ExponentialBackoff},
backoff::{Backoff, BoxedBackoff, ConstantBackoff},
connection_manager::{ConnectionManagerConfig, ConnectionManagerRequester},
connectivity::{ConnectivityConfig, ConnectivityRequester},
multiaddr::Multiaddr,
Expand All @@ -57,7 +51,70 @@ use crate::{
types::CommsDatabase,
};

/// The `CommsBuilder` provides a simple builder API for getting Tari comms p2p messaging up and running.
/// # CommsBuilder
///
/// [CommsBuilder] is used to customize and spawn Tari comms core.
///
/// The following example will get a node customized for your own network up and running.
///
/// ```rust
/// # use std::{sync::Arc, time::Duration};
/// # use rand::rngs::OsRng;
/// # use tari_shutdown::Shutdown;
/// # use tari_comms::{
/// # {CommsBuilder, NodeIdentity},
/// # peer_manager::{PeerStorage, PeerFeatures},
/// # transports::TcpTransport,
/// # };
/// use tari_storage::{
/// lmdb_store::{LMDBBuilder, LMDBConfig},
/// LMDBWrapper,
/// };
///
/// # #[tokio::main]
/// # async fn main() {
/// let node_identity = Arc::new(NodeIdentity::random(
/// &mut OsRng,
/// "/dns4/basenodezforhire.com/tcp/18000".parse().unwrap(),
/// PeerFeatures::COMMUNICATION_NODE,
/// ));
/// node_identity.sign();
/// let mut shutdown = Shutdown::new();
/// let datastore = LMDBBuilder::new()
/// .set_path("/tmp")
/// .set_env_config(LMDBConfig::default())
/// .set_max_number_of_databases(1)
/// .add_database("peers", lmdb_zero::db::CREATE)
/// .build()
/// .unwrap();
///
/// let peer_database = datastore.get_handle("peers").unwrap();
/// let peer_database = LMDBWrapper::new(Arc::new(peer_database));
///
/// let unspawned_node = CommsBuilder::new()
/// // .with_listener_address("/ip4/0.0.0.0/tcp/18000".parse().unwrap())
/// .with_node_identity(node_identity)
/// .with_peer_storage(peer_database, None)
/// .with_shutdown_signal(shutdown.to_signal())
/// .build()
/// .unwrap();
/// // This is your chance to add customizations that may require comms components for e.g. PeerManager.
/// // let my_peer = Peer::new(...);
/// // unspawned_node.peer_manager().add_peer(my_peer.clone());
/// // Add custom extensions implementing `ProtocolExtension`
/// // unspawned_node = unspawned_node.add_protocol_extension(MyCustomProtocol::new(unspawned_node.peer_manager()));
///
/// let transport = TcpTransport::new();
/// let node = unspawned_node.spawn_with_transport(transport).await.unwrap();
/// // Node is alive for 2 seconds
/// tokio::time::sleep(Duration::from_secs(2)).await;
/// shutdown.trigger();
/// node.wait_until_shutdown().await;
/// // let peer_conn = node.connectivity().dial_peer(my_peer.node_id).await.unwrap();
/// # }
/// ```
///
/// [CommsBuilder]: crate::CommsBuilder
pub struct CommsBuilder {
peer_storage: Option<CommsDatabase>,
peer_storage_file_lock: Option<File>,
Expand All @@ -76,7 +133,7 @@ impl Default for CommsBuilder {
peer_storage: None,
peer_storage_file_lock: None,
node_identity: None,
dial_backoff: Box::new(ExponentialBackoff::default()),
dial_backoff: Box::new(ConstantBackoff::new(Duration::from_millis(500))),
hidden_service_ctl: None,
connection_manager_config: ConnectionManagerConfig::default(),
connectivity_config: ConnectivityConfig::default(),
Expand Down Expand Up @@ -141,21 +198,26 @@ impl CommsBuilder {
self
}

/// Sets the address that the transport will listen on. The address must be compatible with the transport.
pub fn with_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.connection_manager_config.listener_address = listener_address;
self
}

/// Sets an auxiliary TCP listener address that can accept peer connections. This is optional.
pub fn with_auxiliary_tcp_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.connection_manager_config.auxiliary_tcp_listener_address = Some(listener_address);
self
}

/// Sets the maximum allowed liveness sessions. Liveness is typically used by tools like docker or kubernetes to
/// detect that the node is live. Defaults to 0 (disabled)
pub fn with_listener_liveness_max_sessions(mut self, max_sessions: usize) -> Self {
self.connection_manager_config.liveness_max_sessions = max_sessions;
self
}

/// Restrict liveness sessions to certain address ranges (CIDR format).
pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec<cidr::AnyIpCidr>) -> Self {
self.connection_manager_config.liveness_cidr_allowlist = cidrs;
self
Expand Down Expand Up @@ -194,8 +256,8 @@ impl CommsBuilder {
self
}

/// Set the backoff that [ConnectionManager] uses when dialing peers. This is optional. If omitted the default
/// ExponentialBackoff is used. [ConnectionManager]: crate::connection_manager::next::ConnectionManager
/// Set the backoff to use when a dial to a remote peer fails. This is optional. If omitted the default
/// [ConstantBackoff](crate::backoff::ConstantBackoff) of 500ms is used.
pub fn with_dial_backoff<T>(mut self, backoff: T) -> Self
where T: Backoff + Send + Sync + 'static {
self.dial_backoff = Box::new(backoff);
Expand Down
94 changes: 0 additions & 94 deletions comms/core/src/compat.rs

This file was deleted.

9 changes: 6 additions & 3 deletions comms/core/src/connection_manager/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ const LOG_TARGET: &str = "comms::connection_manager::common";
/// The maximum size of the peer's user agent string. If the peer sends a longer string it is truncated.
const MAX_USER_AGENT_LEN: usize = 100;

pub async fn perform_identity_exchange<
/// Performs the identity exchange protocol on the given socket.
pub(super) async fn perform_identity_exchange<
'p,
P: IntoIterator<Item = &'p ProtocolId>,
TSocket: AsyncRead + AsyncWrite + Unpin,
Expand All @@ -68,7 +69,7 @@ pub async fn perform_identity_exchange<
///
/// If the `allow_test_addrs` parameter is true, loopback, local link and other addresses normally not considered valid
/// for p2p comms will be accepted.
pub async fn validate_and_add_peer_from_peer_identity(
pub(super) async fn validate_and_add_peer_from_peer_identity(
peer_manager: &PeerManager,
known_peer: Option<Peer>,
authenticated_public_key: CommsPublicKey,
Expand Down Expand Up @@ -171,7 +172,7 @@ fn add_valid_identity_signature_to_peer(
Ok(())
}

pub async fn find_unbanned_peer(
pub(super) async fn find_unbanned_peer(
peer_manager: &PeerManager,
authenticated_public_key: &CommsPublicKey,
) -> Result<Option<Peer>, ConnectionManagerError> {
Expand All @@ -182,6 +183,8 @@ pub async fn find_unbanned_peer(
}
}

/// Checks that the given peer addresses are well-formed and valid. If allow_test_addrs is false, all localhost and
/// memory addresses will be rejected.
pub fn validate_peer_addresses<'a, A: IntoIterator<Item = &'a Multiaddr>>(
addresses: A,
allow_test_addrs: bool,
Expand Down
Loading

0 comments on commit 7b7ca29

Please sign in to comment.