Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to tokio 0.2 and futures 0.3 #1448

Merged
merged 26 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
eb7967c
*: Update to futures 0.3 and tokio 0.2
leoyvens Dec 3, 2019
42a0494
server/http: Upgrade to hyper 0.13
leoyvens Dec 12, 2019
7ebf155
server/index-node, server/metrics: Upgrade to hyper 0.13
leoyvens Dec 12, 2019
59f9f13
*: Upgrade ipfs-api for compatiblity with hyper 0.13
leoyvens Dec 13, 2019
d5129e2
graph: Update tokio-retry, update tests
leoyvens Dec 16, 2019
364c75f
store: Update tests for tokio 0.2
leoyvens Dec 16, 2019
f3f9fec
*: Get all tests compiling on tokio 0.2
leoyvens Dec 16, 2019
fbf106c
core: Fix link resolver tests for tokio 0.2
leoyvens Dec 16, 2019
6f8300d
*: Update ipfs-api to 0.6.0-rc
leoyvens Jan 8, 2020
518042f
*: Rebase tokio 0.2 upgrade
leoyvens Jan 8, 2020
9122c43
json-rpc: Work around issues with tokio 0.2
leoyvens Jan 13, 2020
4f4aaee
graph: Update reqwest to 0.10
leoyvens Jan 13, 2020
487d8f9
runtime: Refactor `block_on` to use local runtime
leoyvens Jan 13, 2020
2c13b89
runtime: Fix `ipfs_map` test
leoyvens Jan 15, 2020
09cad85
network_indexer: Use `spawn_blocking`, fixing tests
leoyvens Jan 15, 2020
5fa09bc
server: Use the blocking pool for graphql queries
leoyvens Jan 15, 2020
4a60e27
graph: Rename `spawn_blocking_ignore_panic`
leoyvens Jan 15, 2020
8293317
json-rpc: Clarify hack for tokio 0.2 compatibility
leoyvens Jan 15, 2020
2d78bad
Cargo.lock: Refresh after rebase
leoyvens Jan 15, 2020
afb1da8
*: Comment on calls
leoyvens Jan 16, 2020
e675a75
server: Refactor code around `service_fn`
leoyvens Jan 16, 2020
7389e31
store: Fix `run_test` helper
leoyvens Jan 17, 2020
8f2acc1
runtime: Remove `threaded_scheduler` if unecessary
leoyvens Jan 17, 2020
fee5945
graph: Avoid using `let _`
leoyvens Jan 17, 2020
fc02994
graph: Refactor `Compat::new`
leoyvens Jan 17, 2020
b1e74f0
store: Remove needless panic
leoyvens Jan 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
797 changes: 546 additions & 251 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions chain/ethereum/src/block_ingestor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use lazy_static;
use std::collections::HashMap;
use std::time::Duration;
use std::time::Instant;

use graph::prelude::*;
use web3::types::*;
Expand Down Expand Up @@ -88,10 +87,9 @@ where
let static_self: &'static _ = Box::leak(Box::new(self));

// Create stream that emits at polling interval
tokio::timer::Interval::new(Instant::now(), static_self.polling_interval)
.map_err(move |e| {
error!(static_self.logger, "timer::Interval failed: {:?}", e);
})
tokio::time::interval(static_self.polling_interval)
.map(Ok)
Jannis marked this conversation as resolved.
Show resolved Hide resolved
.compat()
.for_each(move |_| {
// Attempt to poll
static_self
Expand Down
10 changes: 4 additions & 6 deletions chain/ethereum/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp;
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::time::Duration;

use graph::components::ethereum::{blocks_with_triggers, triggers_in_block};
use graph::data::subgraph::schema::{
Expand All @@ -11,7 +11,6 @@ use graph::data::subgraph::schema::{
use graph::prelude::{
BlockStream as BlockStreamTrait, BlockStreamBuilder as BlockStreamBuilderTrait, *,
};
use tokio::timer::Delay;

lazy_static! {
/// Maximum number of blocks to request in each chunk.
Expand Down Expand Up @@ -881,11 +880,10 @@ where

// Pause before trying again
let secs = (5 * self.consecutive_err_count).max(120) as u64;
let instant = Instant::now() + Duration::from_secs(secs);
state = BlockStreamState::RetryAfterDelay(Box::new(
Delay::new(instant).map_err(|err| {
format_err!("RetryAfterDelay future failed = {}", err)
}),
tokio::time::delay_for(Duration::from_secs(secs))
.map(Ok)
.compat(),
));
break Err(e);
}
Expand Down
6 changes: 3 additions & 3 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::time::Instant;
use ethabi::ParamType;
use graph::components::ethereum::{EthereumAdapter as EthereumAdapterTrait, *};
use graph::prelude::{
debug, err_msg, error, ethabi, format_err, hex, retry, stream, tiny_keccak, tokio_timer, trace,
warn, web3, ChainStore, Error, EthereumCallCache, Logger,
debug, err_msg, error, ethabi, format_err, hex, retry, stream, tiny_keccak, trace, warn, web3,
ChainStore, Error, EthereumCallCache, Logger, TimeoutError,
};
use web3::api::Web3;
use web3::transports::batch::Batch;
Expand Down Expand Up @@ -175,7 +175,7 @@ where
to: u64,
filter: EthGetLogsFilter,
too_many_logs_fingerprints: &'static [&'static str],
) -> impl Future<Item = Vec<Log>, Error = tokio_timer::timeout::Error<web3::error::Error>> {
) -> impl Future<Item = Vec<Log>, Error = TimeoutError<web3::error::Error>> {
let eth_adapter = self.clone();

retry("eth_getLogs RPC call", &logger)
Expand Down
13 changes: 9 additions & 4 deletions chain/ethereum/src/network_indexer/network_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,10 +1174,15 @@ impl NetworkIndexer {
start_block,
});

// Launch state machine
tokio::spawn(state_machine.map_err(move |e| {
error!(logger_for_err, "Network indexer failed: {}", e);
}));
// Launch state machine.
// Blocking due to store interactions. Won't be blocking after #905.
graph::spawn_blocking(
That3Percent marked this conversation as resolved.
Show resolved Hide resolved
state_machine
.map_err(move |e| {
error!(logger_for_err, "Network indexer failed: {}", e);
})
.compat(),
);

Self {
output: Some(output),
Expand Down
21 changes: 12 additions & 9 deletions chain/ethereum/tests/network_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ fn run_network_indexer(
let (event_sink, event_stream) = futures::sync::mpsc::channel(100);

// Run network indexer and forward its events to the channel
tokio::spawn(
graph::spawn(
indexer
.take_event_stream()
.expect("failed to take stream from indexer")
.timeout(timeout)
.map_err(|_| ())
.forward(event_sink.sink_map_err(|_| ()))
.map(|_| ()),
.map(|_| ())
.timeout(timeout),
);

future::ok((chains, event_stream.collect()))
Expand All @@ -106,13 +106,16 @@ where
};

runtime
.block_on(future::lazy(move || {
// Reset store before running
remove_test_data(store.clone());
.block_on(
future::lazy(move || {
// Reset store before running
remove_test_data(store.clone());

// Run test
test(store.clone())
}))
// Run test
test(store.clone())
})
.compat(),
)
.expect("failed to run test with clean store");
}

Expand Down
8 changes: 2 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ version = "0.17.1"
edition = "2018"

[dependencies]
bytes = "0.4.12"
bytes = "0.5"
futures = "0.1.21"
graph = { path = "../graph" }
graph-graphql = { path = "../graphql" }

# We're using the latest ipfs-api for the HTTPS support that was merged in
# https://github.com/ferristseng/rust-ipfs-api/commit/55902e98d868dcce047863859caf596a629d10ec
# but has not been released yet.
ipfs-api = { git = "https://github.com/ferristseng/rust-ipfs-api", branch = "master", features = ["hyper-tls"] }
ipfs-api = { version = "0.6.0-rc", features = ["hyper-tls"] }
lazy_static = "1.2.0"
lru_time_cache = "0.9"
semver = "0.9.0"
Expand Down
83 changes: 42 additions & 41 deletions core/src/link_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn read_u64_from_env(name: &str) -> Option<u64> {
/// if the IPFS file at `path` is no bigger than `max_file_bytes`.
/// If `max_file_bytes` is `None`, do not restrict the size of the file
fn restrict_file_size<T>(
client: &ipfs_api::IpfsClient,
client: ipfs_api::IpfsClient,
path: String,
timeout: Duration,
max_file_bytes: Option<u64>,
Expand All @@ -61,11 +61,13 @@ where
{
match max_file_bytes {
Some(max_bytes) => Box::new(
client
.object_stat(&path)
.timeout(timeout)
.map_err(|e| failure::err_msg(e.to_string()))
.and_then(move |stat| match stat.cumulative_size > max_bytes {
Box::pin(async move {
let stat = tokio::time::timeout(timeout, client.object_stat(&path).err_into())
.err_into()
.await
.and_then(|x| x);
Jannis marked this conversation as resolved.
Show resolved Hide resolved

stat.and_then(move |stat| match stat.cumulative_size > max_bytes {
false => Ok(()),
true => Err(format_err!(
"IPFS file {} is too large. It can be at most {} bytes but is {} bytes",
Expand All @@ -74,7 +76,9 @@ where
stat.cumulative_size
)),
})
.and_then(|()| fut),
})
.compat()
.and_then(|()| fut),
),
None => fut,
}
Expand Down Expand Up @@ -151,16 +155,17 @@ impl LinkResolverTrait for LinkResolver {

let cat = client_for_cat
.cat(&path)
.concat2()
.map(|x| x.to_vec())
.map_err(|e| failure::err_msg(e.to_string()));
.map_ok(|b| BytesMut::from_iter(b.into_iter()))
.try_concat()
.map_ok(|x| x.to_vec())
.err_into();

restrict_file_size(
&client_for_file_size,
client_for_file_size.clone(),
path.clone(),
timeout_for_file_size,
max_file_size,
Box::new(cat),
Box::new(cat.compat()),
)
.map(move |data| {
// Only cache files if they are not too large
Expand Down Expand Up @@ -188,7 +193,7 @@ impl LinkResolverTrait for LinkResolver {
) -> Box<dyn Future<Item = JsonValueStream, Error = failure::Error> + Send + 'static> {
// Discard the `/ipfs/` prefix (if present) to get the hash.
let path = link.link.trim_start_matches("/ipfs/").to_owned();
let mut stream = self.client.cat(&path).fuse();
let mut stream = self.client.cat(&path).fuse().compat();
let mut buf = BytesMut::with_capacity(1024);
// Count the number of lines we've already successfully deserialized.
// We need that to adjust the line number in error messages from serde_json
Expand Down Expand Up @@ -248,7 +253,7 @@ impl LinkResolverTrait for LinkResolver {
read_u64_from_env(MAX_IPFS_MAP_FILE_SIZE_VAR).unwrap_or(DEFAULT_MAX_IPFS_MAP_FILE_SIZE);

restrict_file_size(
&self.client,
self.client.clone(),
path,
self.timeout,
Some(max_file_size),
Expand All @@ -262,23 +267,19 @@ mod tests {
use super::*;
use serde_json::json;

#[test]
fn max_file_size() {
#[tokio::test]
async fn max_file_size() {
env::set_var(MAX_IPFS_FILE_SIZE_VAR, "200");
let file: &[u8] = &[0u8; 201];
let client = ipfs_api::IpfsClient::default();
let resolver = super::LinkResolver::from(client.clone());

let logger = Logger::root(slog::Discard, o!());

let mut runtime = tokio::runtime::Runtime::new().unwrap();
let link = runtime.block_on(client.add(file)).unwrap().hash;
let err = runtime
.block_on(LinkResolver::cat(
&resolver,
&logger,
&Link { link: link.clone() },
))
let link = client.add(file).await.unwrap().hash;
let err = LinkResolver::cat(&resolver, &logger, &Link { link: link.clone() })
.compat()
.await
.unwrap_err();
env::remove_var(MAX_IPFS_FILE_SIZE_VAR);
assert_eq!(
Expand All @@ -290,47 +291,47 @@ mod tests {
);
}

fn json_round_trip(text: &'static str) -> Result<Vec<Value>, failure::Error> {
async fn json_round_trip(text: &'static str) -> Result<Vec<Value>, failure::Error> {
let client = ipfs_api::IpfsClient::default();
let resolver = super::LinkResolver::from(client.clone());

let mut runtime = tokio::runtime::Runtime::new().unwrap();
let link = runtime.block_on(client.add(text.as_bytes())).unwrap().hash;
runtime.block_on(
LinkResolver::json_stream(&resolver, &Link { link: link.clone() })
.and_then(|stream| stream.map(|sv| sv.value).collect()),
)
let link = client.add(text.as_bytes()).await.unwrap().hash;

LinkResolver::json_stream(&resolver, &Link { link: link.clone() })
.and_then(|stream| stream.map(|sv| sv.value).collect())
.compat()
.await
}

#[test]
fn read_json_stream() {
let values = json_round_trip("\"with newline\"\n");
#[tokio::test]
async fn read_json_stream() {
let values = json_round_trip("\"with newline\"\n").await;
assert_eq!(vec![json!("with newline")], values.unwrap());

let values = json_round_trip("\"without newline\"");
let values = json_round_trip("\"without newline\"").await;
assert_eq!(vec![json!("without newline")], values.unwrap());

let values = json_round_trip("\"two\" \n \"things\"");
let values = json_round_trip("\"two\" \n \"things\"").await;
assert_eq!(vec![json!("two"), json!("things")], values.unwrap());

let values = json_round_trip("\"one\"\n \"two\" \n [\"bad\" \n \"split\"]");
let values = json_round_trip("\"one\"\n \"two\" \n [\"bad\" \n \"split\"]").await;
assert_eq!(
"EOF while parsing a list at line 4 column 0: ' [\"bad\" \n'",
values.unwrap_err().to_string()
);
}

#[test]
fn ipfs_map_file_size() {
#[tokio::test]
async fn ipfs_map_file_size() {
let file = "\"small test string that trips the size restriction\"";
env::set_var(MAX_IPFS_MAP_FILE_SIZE_VAR, (file.len() - 1).to_string());

let err = json_round_trip(file).unwrap_err();
let err = json_round_trip(file).await.unwrap_err();
env::remove_var(MAX_IPFS_MAP_FILE_SIZE_VAR);

assert!(err.to_string().contains(" is too large"));

let values = json_round_trip(file);
let values = json_round_trip(file).await;
assert_eq!(
vec!["small test string that trips the size restriction"],
values.unwrap()
Expand Down
18 changes: 8 additions & 10 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ impl SubgraphInstanceManager {
// Subgraph instance shutdown senders
let instances: SharedInstanceKeepAliveMap = Default::default();

tokio::spawn(receiver.for_each(move |event| {
// Blocking due to store interactions. Won't be blocking after #905.
graph::spawn_blocking(receiver.compat().try_for_each(move |event| {
leoyvens marked this conversation as resolved.
Show resolved Hide resolved
use self::SubgraphAssignmentProviderEvent::*;

match event {
Expand Down Expand Up @@ -303,7 +304,7 @@ impl SubgraphInstanceManager {
}
};

Ok(())
futures03::future::ok(())
That3Percent marked this conversation as resolved.
Show resolved Hide resolved
}));
}

Expand Down Expand Up @@ -418,14 +419,11 @@ impl SubgraphInstanceManager {
// forward; this is easier than updating the existing block stream.
//
// This task has many calls to the store, so mark it as `blocking`.
let subgraph_runner =
graph::util::futures::blocking(loop_fn(ctx, move |ctx| run_subgraph(ctx))).then(
move |res| {
subgraph_metrics_unregister.unregister(registry);
future::result(res)
},
);
tokio::spawn(subgraph_runner);
let subgraph_runner = loop_fn(ctx, move |ctx| run_subgraph(ctx)).then(move |res| {
subgraph_metrics_unregister.unregister(registry);
future::result(res)
});
graph::spawn_blocking(subgraph_runner.compat());

Ok(())
}
Expand Down
Loading