Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Part 5/N] Interledger CCP: Futures 0.3 Transition #598

Merged
merged 10 commits into from
Jan 29, 2020
203 changes: 203 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ members = [
# "./crates/interledger",
# "./crates/interledger-api",
# "./crates/interledger-btp",
# "./crates/interledger-ccp",
"./crates/interledger-ccp",
# "./crates/interledger-http",
"./crates/interledger-ildcp",
"./crates/interledger-packet",
Expand Down
6 changes: 3 additions & 3 deletions crates/interledger-ccp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ repository = "https://github.com/interledger-rs/interledger-rs"
[dependencies]
bytes = { version = "0.4.12", default-features = false }
byteorder = { version = "1.3.2", default-features = false }
futures = { version = "0.1.29", default-features = false }
futures = { version = "0.3", default-features = false }
hex = { version = "0.4.0", default-features = false }
interledger-packet = { path = "../interledger-packet", version = "^0.4.0", default-features = false }
interledger-service = { path = "../interledger-service", version = "^0.4.0", default-features = false }
lazy_static = { version = "1.4.0", default-features = false }
log = { version = "0.4.8", default-features = false }
parking_lot = { version = "0.9.0", default-features = false }
ring = { version = "0.16.9", default-features = false }
tokio-executor = { version = "0.1.8", default-features = false }
tokio-timer = { version = "0.2.11", default-features = false }
uuid = { version = "0.8.1", default-features = false, features = ["v4"]}
serde = { version = "1.0.101", default-features = false, features = ["derive"] }
async-trait = "0.1.22"
tokio = { version = "0.2.6", features = ["time", "rt-core", "macros"] }
29 changes: 17 additions & 12 deletions crates/interledger-ccp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! updates are used by the `Router` to forward incoming packets to the best next hop
//! we know about.

use futures::Future;
use async_trait::async_trait;
use interledger_service::Account;
use std::collections::HashMap;
use std::{fmt, str::FromStr};
Expand All @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};

/// Data structure used to describe the routing relation of an account with its peers.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Serialize, Deserialize, Ord, Eq)]
pub enum RoutingRelation {
/// An account from which we do not receive routes from, neither broadcast
/// routes to
Expand Down Expand Up @@ -98,25 +98,30 @@ pub trait CcpRoutingAccount: Account {
type Routes<T> = HashMap<String, T>;
type LocalAndConfiguredRoutes<T> = (Routes<T>, Routes<T>);

/// Store trait for managing the routes broadcast and set over Connector to Connector protocol
#[async_trait]
pub trait RouteManagerStore: Clone {
type Account: CcpRoutingAccount;

// TODO should we have a way to only get the details for specific routes?
fn get_local_and_configured_routes(
/// Gets the local and manually configured routes
async fn get_local_and_configured_routes(
&self,
) -> Box<dyn Future<Item = LocalAndConfiguredRoutes<Self::Account>, Error = ()> + Send>;
) -> Result<LocalAndConfiguredRoutes<Self::Account>, ()>;

fn get_accounts_to_send_routes_to(
/// Gets all accounts which the node should send routes to (Peer and Child accounts)
/// The caller can also pass a vector of account ids to be ignored
async fn get_accounts_to_send_routes_to(
&self,
ignore_accounts: Vec<Uuid>,
) -> Box<dyn Future<Item = Vec<Self::Account>, Error = ()> + Send>;
) -> Result<Vec<Self::Account>, ()>;

fn get_accounts_to_receive_routes_from(
&self,
) -> Box<dyn Future<Item = Vec<Self::Account>, Error = ()> + Send>;
/// Gets all accounts which the node should receive routes to (Peer and Parent accounts)
async fn get_accounts_to_receive_routes_from(&self) -> Result<Vec<Self::Account>, ()>;

fn set_routes(
/// Sets the new routes to the store (prefix -> account)
async fn set_routes(
&mut self,
routes: impl IntoIterator<Item = (String, Self::Account)>,
) -> Box<dyn Future<Item = (), Error = ()> + Send>;
routes: impl IntoIterator<Item = (String, Self::Account)> + Send + 'async_trait,
) -> Result<(), ()>;
}
22 changes: 20 additions & 2 deletions crates/interledger-ccp/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ lazy_static! {
Address::from_str("peer.route.update").unwrap();
}

/// CCP Packet mode used in Route Control Requests of the CCP protocol.
/// Idle: Account does not wish to receive more routes
/// Sync: Account wishes to receive routes
#[derive(Clone, Copy, PartialEq, Debug)]
#[repr(u8)]
pub enum Mode {
Expand All @@ -60,6 +63,9 @@ impl TryFrom<u8> for Mode {
}
}

/// A request that ask the receiver node to transition to Idle or Sync mode.
/// If the mode is Idle, the receiver of the request will stop broadcasting routes to the sender.
/// If the mode is Sync, the receiver will start broadcasting routes to that account.
#[derive(Clone, PartialEq)]
pub struct RouteControlRequest {
pub mode: Mode,
Expand Down Expand Up @@ -153,8 +159,14 @@ impl RouteControlRequest {
}
}

impl From<RouteControlRequest> for Prepare {
fn from(request: RouteControlRequest) -> Self {
request.to_prepare()
}
}

#[derive(Clone, PartialEq, Debug)]
pub struct RouteProp {
pub(crate) struct RouteProp {
pub(crate) is_optional: bool,
pub(crate) is_transitive: bool,
pub(crate) is_partial: bool,
Expand Down Expand Up @@ -215,7 +227,7 @@ impl RouteProp {
}

#[derive(Clone, PartialEq)]
pub struct Route {
pub(crate) struct Route {
// TODO switch this to use the Address type so we don't need separate parsing logic when implementing Debug
pub(crate) prefix: String,
pub(crate) path: Vec<String>,
Expand Down Expand Up @@ -401,6 +413,12 @@ impl RouteUpdateRequest {
}
}

impl From<RouteUpdateRequest> for Prepare {
fn from(request: RouteUpdateRequest) -> Self {
request.to_prepare()
}
}

#[cfg(test)]
mod route_control_request {
use super::*;
Expand Down
30 changes: 15 additions & 15 deletions crates/interledger-ccp/src/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy_static! {
static ref RANDOM: SystemRandom = SystemRandom::new();
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct PrefixMap<T> {
map: HashMap<String, T>,
}
Expand Down Expand Up @@ -44,7 +44,7 @@ impl<T> PrefixMap<T> {
/// When an Interledger node reloads, it will generate a new UUID for its routing table.
/// Each update applied increments the epoch number, so it acts as a version tracker.
/// This helps peers make sure they are in sync with one another and request updates if not.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RoutingTable<A> {
id: [u8; 16],
epoch: u32,
Expand All @@ -55,7 +55,7 @@ impl<A> RoutingTable<A>
where
A: Clone,
{
pub fn new(id: [u8; 16]) -> Self {
pub(crate) fn new(id: [u8; 16]) -> Self {
RoutingTable {
id,
epoch: 0,
Expand All @@ -64,63 +64,63 @@ where
}

#[cfg(test)]
pub fn set_id(&mut self, id: [u8; 16]) {
pub(crate) fn set_id(&mut self, id: [u8; 16]) {
self.id = id;
self.epoch = 0;
}

#[cfg(test)]
pub fn set_epoch(&mut self, epoch: u32) {
pub(crate) fn set_epoch(&mut self, epoch: u32) {
self.epoch = epoch;
}

pub fn id(&self) -> [u8; 16] {
pub(crate) fn id(&self) -> [u8; 16] {
self.id
}

pub fn epoch(&self) -> u32 {
pub(crate) fn epoch(&self) -> u32 {
self.epoch
}

pub fn increment_epoch(&mut self) -> u32 {
pub(crate) fn increment_epoch(&mut self) -> u32 {
let epoch = self.epoch;
self.epoch += 1;
epoch
}

/// Set a particular route, overwriting the one that was there before
pub fn set_route(&mut self, prefix: String, account: A, route: Route) {
pub(crate) fn set_route(&mut self, prefix: String, account: A, route: Route) {
self.prefix_map.remove(&prefix);
self.prefix_map.insert(prefix, (account, route));
}

/// Remove the route for the given prefix. Returns true if that route existed before
pub fn delete_route(&mut self, prefix: &str) -> bool {
pub(crate) fn delete_route(&mut self, prefix: &str) -> bool {
self.prefix_map.remove(prefix)
}

/// Add the given route. Returns true if that routed did not already exist
pub fn add_route(&mut self, account: A, route: Route) -> bool {
pub(crate) fn add_route(&mut self, account: A, route: Route) -> bool {
self.prefix_map
.insert(route.prefix.clone(), (account, route))
}

/// Get the best route we have for the given prefix
pub fn get_route(&self, prefix: &str) -> Option<&(A, Route)> {
pub(crate) fn get_route(&self, prefix: &str) -> Option<&(A, Route)> {
self.prefix_map.resolve(prefix)
}

pub fn get_simplified_table(&self) -> HashMap<&str, A> {
pub(crate) fn get_simplified_table(&self) -> HashMap<String, A> {
HashMap::from_iter(
self.prefix_map
.map
.iter()
.map(|(address, (account, _route))| (address.as_str(), account.clone())),
.map(|(address, (account, _route))| (address.clone(), account.clone())),
)
}

/// Handle a CCP Route Update Request from the peer this table represents
pub fn handle_update_request(
pub(crate) fn handle_update_request(
&mut self,
account: A,
request: RouteUpdateRequest,
Expand Down
Loading