Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to 0.3 & Async/Await #582

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8f4b917
docs(service): Add missing docs for interledger-service
gakonst Dec 30, 2019
67095a3
feat(service): upgrade to futures 0.3
gakonst Dec 30, 2019
e96d71b
feat(service): add async-trait
gakonst Jan 2, 2020
fbf0ab1
feat(service): remove boxes
gakonst Jan 2, 2020
b35da6b
feat(service): remove boxes
gakonst Jan 2, 2020
5a3f36b
feat(ildcp): move to futures 0.3 and async/await
gakonst Jan 2, 2020
9ef91f7
chore: update cargo.lock
gakonst Jan 2, 2020
2a02842
feat(router): move to futures 0.3 and async/await
gakonst Jan 2, 2020
d335f4e
feat(packet): implement From<Fulfill/Reject> for bytes05::BytesMut
gakonst Jan 3, 2020
680a83e
feat(http): move to futures 0.3 and add async/await
gakonst Jan 3, 2020
26c82b4
feat(http): Get ILP over HTTP working + tests
gakonst Jan 3, 2020
bdb283d
chore(service): Temporarily disable tracing
gakonst Jan 6, 2020
b5a39f4
feat(ccp): update futures and traits
gakonst Jan 6, 2020
22d2aac
feat(ccp): move to futures 0.3 and async/await
gakonst Jan 6, 2020
3ff26ae
feat(ccp): WIP - Re-enable the problematic code
gakonst Jan 7, 2020
44f65a1
feat(settlement): move to futures 0.3 and async/await
gakonst Jan 7, 2020
5c60e39
feat(service-util): move to futures 0.3 and async/await
gakonst Jan 7, 2020
4734309
fix(service): re-enable trace
gakonst Jan 7, 2020
738b606
fix(ccp): fix Mutex errors
gakonst Jan 8, 2020
3f1819c
feat(btp): move to futures 0.3 and async/await
gakonst Jan 8, 2020
3adeacb
feat(stream): move to futures 0.3 and async/await
gakonst Jan 8, 2020
b9488ad
feat(spsp): move to futures 0.3 and async/await
gakonst Jan 8, 2020
fb0722c
feat(api): WIP - move to futures 0.3 and async/await blocked on Warp
gakonst Jan 9, 2020
737c044
feat(store): move to futures 0.3 and async/await
gakonst Jan 9, 2020
d6e9fef
fix(service): refactor wrapped services to accept futures
gakonst Jan 10, 2020
f831435
fix(api/btp/util): Re-enable unimplemented functions
gakonst Jan 10, 2020
10f4cd0
feat(ilp-node): Make the node compile
gakonst Jan 10, 2020
d98f190
feat(api): Update node_settings API
gakonst Jan 10, 2020
eec3843
feat(api): wip on accounts API working
gakonst Jan 10, 2020
3f7f6a8
chore(various): random wips which need cleanup
gakonst Jan 10, 2020
d316689
feat(api): Migrate full API to 0.3 and async/await
gakonst Jan 13, 2020
4e301c5
test(e2e): three-nodes and exchange rates tests pass
gakonst Jan 13, 2020
d7437fe
fix(ccp): clippy lints
gakonst Jan 13, 2020
8674b1f
fix(api/service-util): Fix scoped format compilation error
gakonst Jan 13, 2020
b9a90c8
feat(btp): Implement WebSocket wrapper for Warp <> Tungstenite compat
gakonst Jan 15, 2020
22fb5ba
feat(btp): Implement WebSocket wrapper for Warp <> Tungstenite compat
gakonst Jan 15, 2020
6a70382
feat(btp): Get the rest of BTP working
gakonst Jan 15, 2020
f8a83e7
test(btp/three-nodes): BTP and Three nodes tests are working
gakonst Jan 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
795 changes: 647 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions crates/ilp-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,37 @@ required-features = ["redis"]

[dependencies]
bytes = { version = "0.4.12", default-features = false }
bytes05 = { package = "bytes", version = "0.5", default-features = false }
clap = { version = "2.33.0", default-features = false }
config = { version = "0.9.3", default-features = false, features = ["json", "toml", "yaml"] }
futures = { version = "0.1.29", default-features = false }
futures = { version = "0.3.1", default-features = false, features = ["compat"] }
hex = { version = "0.4.0", default-features = false }
interledger = { path = "../interledger", version = "^0.6.0", default-features = false, features = ["node"] }
lazy_static = { version = "1.4.0", default-features = false }
metrics = { version = "0.12.0", default-features = false, features = ["std"] }
metrics-core = { version = "0.5.1", default-features = false }
metrics-runtime = { version = "0.12.0", default-features = false, features = ["metrics-observer-prometheus"] }
num-bigint = { version = "0.2.3", default-features = false, features = ["std"] }
redis_crate = { package = "redis", version = "0.13.0", default-features = false, features = ["executor"], optional = true }
# redis_crate = { package = "redis", version = "0.13.0", default-features = false, features = ["executor"], optional = true }
redis_crate = { package = "redis", git = "https://github.com/mitsuhiko/redis-rs", optional = true, features = ["tokio-rt-core"] }
ring = { version = "0.16.9", default-features = false }
serde = { version = "1.0.101", default-features = false }
tokio = { version = "0.1.22", default-features = false }
tokio = { version = "0.2.8", features = ["rt-core", "macros", "time"] }
tracing = { version = "0.1.9", default-features = true, features = ["log"] }
tracing-futures = { version = "0.1.1", default-features = true, features = ["tokio", "futures-01"] }
tracing-futures = { version = "0.2", default-features = true, features = ["tokio", "futures-03"] }
tracing-subscriber = { version = "0.1.6", default-features = true, features = ["tracing-log"] }
url = { version = "2.1.0", default-features = false }
libc = { version = "0.2.62", default-features = false }
warp = { version = "0.1.20", default-features = false, features = ["websocket"] }
# warp = { version = "0.1.20", default-features = false, features = ["websocket"] }
warp = { git = "https://github.com/seanmonstar/warp", default-features = false }
secrecy = { version = "0.5.1", default-features = false, features = ["alloc", "serde"] }
uuid = { version = "0.8.1", default-features = false}

# For google-pubsub
base64 = { version = "0.10.1", default-features = false, optional = true }
chrono = { version = "0.4.9", default-features = false, features = [], optional = true}
parking_lot = { version = "0.9.0", default-features = false, optional = true }
reqwest = { version = "0.9.22", default-features = false, features = ["default-tls"], optional = true }
reqwest = { version = "0.10.0", default-features = false, features = ["default-tls", "json"], optional = true }
serde_json = { version = "1.0.41", default-features = false, optional = true }
yup-oauth2 = { version = "3.1.1", default-features = false, optional = true }

Expand All @@ -60,7 +63,7 @@ approx = { version = "0.3.2", default-features = false }
base64 = { version = "0.10.1", default-features = false }
net2 = { version = "0.2.33", default-features = false }
rand = { version = "0.7.2", default-features = false }
reqwest = { version = "0.9.22", default-features = false, features = ["default-tls"] }
reqwest = { version = "0.10.0", default-features = false, features = ["default-tls", "json"] }
serde_json = { version = "1.0.41", default-features = false }
tokio-retry = { version = "0.2.0", default-features = false }

Expand Down
146 changes: 79 additions & 67 deletions crates/ilp-node/src/google_pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#[cfg(feature = "google_pubsub")]
use base64;
use chrono::Utc;
use futures::{
compat::Future01CompatExt,
future::{ok, Either},
Future,
Future, TryFutureExt,
};
use interledger::{
ccp::CcpRoutingAccount,
packet::Address,
service::{Account, BoxedIlpFuture, OutgoingRequest, OutgoingService, Username},
service::{Account, IlpResult, OutgoingRequest, OutgoingService, Username},
};
use parking_lot::Mutex;
use reqwest::r#async::Client;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use tokio::spawn;
Expand Down Expand Up @@ -58,16 +61,17 @@ struct PacketRecord {
timestamp: String,
}

use std::pin::Pin;
/// Create an Interledger service wrapper that publishes records
/// of fulfilled packets to Google Cloud PubSub.
///
/// This is an experimental feature that may be removed in the future.
pub fn create_google_pubsub_wrapper<
pub async fn create_google_pubsub_wrapper<
A: Account + 'static,
O: OutgoingService<A> + Clone + Send + 'static,
>(
config: Option<PubsubConfig>,
) -> impl Fn(OutgoingRequest<A>, O) -> BoxedIlpFuture + Clone {
) -> impl Fn(OutgoingRequest<A>, O) -> Pin<Box<Future<Output = IlpResult>>> + Clone {
// If Google credentials were passed in, create an HTTP client and
// OAuth2 client that will automatically fetch and cache access tokens
let utilities = if let Some(config) = config {
Expand All @@ -91,10 +95,16 @@ pub fn create_google_pubsub_wrapper<
None
};

move |request: OutgoingRequest<A>, mut next: O| -> BoxedIlpFuture {
move |request: OutgoingRequest<A>, mut next: O| -> Pin<Box<Future<Output = IlpResult>>> {
let mut next_clone = next.clone();
let mut next_clone2 = next.clone();
match &utilities {
// Just pass the request on if no Google Pubsub details were configured
None => Box::new(next.send_request(request)),
// Due to using async_trait this becomes a Box::pin!
None => Box::pin(async move {
let fulfill = next_clone.send_request(request).await?;
Ok(fulfill)
}),
Some((client, api_endpoint, token_fetcher)) => {
let prev_hop_account = request.from.username().clone();
let prev_hop_asset_code = request.from.asset_code().to_string();
Expand All @@ -108,78 +118,80 @@ pub fn create_google_pubsub_wrapper<
let client = client.clone();
let api_endpoint = api_endpoint.clone();
let token_fetcher = token_fetcher.clone();

Box::new(next.send_request(request).map(move |fulfill| {
Box::pin(async move {
// Only fulfilled packets are published for now
let fulfill = next_clone2.send_request(request).await?;
let fulfillment = base64::encode(fulfill.fulfillment());

let get_token_future = token_fetcher.lock()
let get_token_future = token_fetcher
.lock()
.token(TOKEN_SCOPES)
.compat()
.map_ok(|token: yup_oauth2::Token| token.access_token)
.map_err(|err| {
error!("Error fetching OAuth token for Google PubSub: {:?}", err)
});

// Spawn a task to submit the packet to PubSub so we
// don't block returning the fulfillment
// Note this means that if there is a problem submitting the
// packet record to PubSub, it will only log an error
spawn(
get_token_future
.and_then(move |token| {
let record = PacketRecord {
prev_hop_account,
prev_hop_asset_code,
prev_hop_asset_scale,
prev_hop_amount,
next_hop_account,
next_hop_asset_code,
next_hop_asset_scale,
next_hop_amount,
destination_ilp_address,
fulfillment,
timestamp: Utc::now().to_rfc3339(),
};
let data = base64::encode(&serde_json::to_string(&record).unwrap());
spawn(async move {
let token = get_token_future.await?;

let record = PacketRecord {
prev_hop_account,
prev_hop_asset_code,
prev_hop_asset_scale,
prev_hop_amount,
next_hop_account,
next_hop_asset_code,
next_hop_asset_scale,
next_hop_amount,
destination_ilp_address,
fulfillment,
timestamp: Utc::now().to_rfc3339(),
};
let data = base64::encode(&serde_json::to_string(&record).unwrap());

let res = client
.post(api_endpoint.as_str())
.bearer_auth(token)
.json(&PubsubRequest {
messages: vec![PubsubMessage {
// TODO should there be an ID?
message_id: None,
data: Some(data),
attributes: None,
publish_time: None,
}],
})
.send()
.map_err(|err| {
error!("Error sending packet details to Google PubSub: {:?}", err)
})
.await?;

if res.status().is_success() {
return Ok(());
} else {
let status = res.status();
let body = res
.text()
.map_err(|err| error!("Error getting response body: {:?}", err))
.await?;
error!(
%status,
"Error sending packet details to Google PubSub: {}",
body
);
}

Ok::<(), ()>(())
});

client
.post(api_endpoint.as_str())
.bearer_auth(token.access_token)
.json(&PubsubRequest {
messages: vec![PubsubMessage {
// TODO should there be an ID?
message_id: None,
data: Some(data),
attributes: None,
publish_time: None,
}],
})
.send()
.map_err(|err| {
error!(
"Error sending packet details to Google PubSub: {:?}",
err
)
})
.and_then(|mut res| {
if res.status().is_success() {
Either::A(ok(()))
} else {
let status = res.status();
Either::B(res.text()
.map_err(|err| error!("Error getting response body: {:?}", err))
.and_then(move |body| {
error!(
%status,
"Error sending packet details to Google PubSub: {}",
body
);
Ok(())
}))
}
})
}),
);
fulfill
}))
Ok(fulfill)
})
}
}
}
Expand Down
13 changes: 5 additions & 8 deletions crates/ilp-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#![type_length_limit = "1152909"]
#![type_length_limit = "6000000"]

mod metrics;
// mod metrics;
mod node;
mod trace;
// mod trace;

#[cfg(feature = "google-pubsub")]
mod google_pubsub;
// #[cfg(feature = "google-pubsub")]
// mod google_pubsub;
#[cfg(feature = "redis")]
mod redis_store;

pub use node::*;
#[allow(deprecated)]
#[cfg(feature = "redis")]
pub use redis_store::insert_account_with_redis_store;
9 changes: 6 additions & 3 deletions crates/ilp-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![type_length_limit = "1152909"]
#![type_length_limit = "6000000"]

mod metrics;
mod node;
Expand All @@ -24,7 +24,8 @@ use tracing_subscriber::{
fmt::{time::ChronoUtc, Subscriber},
};

pub fn main() {
#[tokio::main]
async fn main() {
Subscriber::builder()
.with_timer(ChronoUtc::rfc3339())
.with_env_filter(EnvFilter::from_default_env())
Expand Down Expand Up @@ -143,7 +144,9 @@ pub fn main() {
}
let matches = app.clone().get_matches();
merge_args(&mut config, &matches);
config.try_into::<InterledgerNode>().unwrap().run();

let node = config.try_into::<InterledgerNode>().unwrap();
node.serve().await;
}

// returns (subcommand paths, config path)
Expand Down
Loading