From 4479c3e56b66055aaf010ca2364b88d6008ddde3 Mon Sep 17 00:00:00 2001 From: zeroqn Date: Wed, 5 Jun 2019 05:43:34 +0000 Subject: [PATCH] refactor: Taste runtime crate. (#268) * chore(core-network): switch to runtime crate * chore(main): wrap main in async runtime * fix(main): clippy false-positives on needless lifetime * chore(core-pubsub): replace std::thread::spawn with runtime spawn * doc(core-pubsub): use runtime::spawn * test(core-pubsub): use runtime::spawn * chore(comp-tx-pool): use runtime::spawn * fix(comp-tx-pool): some tests timeout Switch from native runtime to tokio runtime * chore(core-consensus): use runtime::spawn * test(comp-tx-pool): speed up broadcast_txs test * test(core-network): use runtime::test, remove block_on * style(main): consistent struct init * fix(comp-tx-pool): block on cache_broadcast_receiver Use await instead * style(comp-tx-pool): use ```for_each``` instead of loop in cache_broadcast_txs --- Cargo.lock | 227 ++++++++++++++++++++++++- Cargo.toml | 5 +- components/transaction-pool/Cargo.toml | 2 + components/transaction-pool/src/lib.rs | 131 ++++++++------ core/consensus/src/synchronizer.rs | 32 ++-- core/network/Cargo.toml | 5 +- core/network/src/inbound.rs | 59 ++++--- core/network/src/inbound/consensus.rs | 43 +++-- core/network/src/inbound/sync.rs | 176 ++++++++++--------- core/network/src/inbound/tx_pool.rs | 114 +++++++------ core/network/src/outbound.rs | 38 +++-- core/network/src/outbound/sync.rs | 70 ++++---- core/network/src/outbound/tx_pool.rs | 32 ++-- core/network/src/peer_manager.rs | 12 +- core/network/src/service.rs | 26 +-- core/pubsub/Cargo.toml | 1 + core/pubsub/README.md | 27 ++- core/pubsub/examples/pubsub_show.rs | 20 ++- core/pubsub/src/broadcast.rs | 50 +++--- core/pubsub/src/lib.rs | 22 +-- core/pubsub/src/pubsub.rs | 4 +- core/pubsub/src/worker.rs | 15 +- src/main.rs | 66 ++++--- 23 files changed, 702 insertions(+), 475 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ec89d8d1..5a0ff9d85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,6 +321,16 @@ dependencies = [ "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "async-datagram" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "async-ready" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "atty" version = "0.2.11" @@ -507,6 +517,11 @@ name = "build_const" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "bumpalo" +version = "2.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "byte-tools" version = "0.2.0" @@ -795,6 +810,8 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-tokio 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -909,15 +926,15 @@ dependencies = [ "core-serialization 0.1.0", "core-types 0.1.0", "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "tentacle 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tentacle-discovery 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -936,6 +953,7 @@ version = "0.1.0" dependencies = [ "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1011,6 +1029,21 @@ dependencies = [ "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-deque 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam" version = "0.7.1" @@ -1042,6 +1075,15 @@ dependencies = [ "crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam-deque" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "crossbeam-deque" version = "0.7.1" @@ -1547,7 +1589,7 @@ dependencies = [ [[package]] name = "futures-timer" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1572,6 +1614,7 @@ dependencies = [ "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1818,11 +1861,30 @@ name = "itoa" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "js-sys" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "wasm-bindgen 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "json" version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "juliex" +version = "0.3.0-alpha.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "keccak" version = "0.1.0" @@ -2067,10 +2129,11 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-tokio 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2795,6 +2858,83 @@ dependencies = [ "librocksdb-sys 5.18.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "romio" +version = "0.3.0-alpha.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "async-datagram 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "async-ready 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.17 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "runtime" +version = "0.3.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-attributes 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-native 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-raw 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "runtime-attributes" +version = "0.3.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "runtime-native" +version = "0.3.0-alpha.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "async-datagram 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "juliex 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "romio 0.3.0-alpha.8 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-raw 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-futures 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "runtime-raw" +version = "0.3.0-alpha.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "runtime-tokio" +version = "0.3.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.17 (registry+https://github.com/rust-lang/crates.io-index)", + "runtime-raw 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.14" @@ -3639,6 +3779,64 @@ dependencies = [ "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "wasm-bindgen-macro 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bumpalo 2.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-shared 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", + "js-sys 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-macro-support 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-backend 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-shared 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.45" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "which" version = "2.0.1" @@ -3748,6 +3946,8 @@ dependencies = [ "checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" +"checksum async-datagram 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43d8594c35b0f8c540f322ac40fcf32536fa9fe41872ccf60b445a06a87fd1ae" +"checksum async-ready 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb690f88f674eed5c82591f5bc74e97f0fe29393e39e3c3b18b451c92ab0e753" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a6d640bee2da49f60a4068a7fae53acde8982514ab7bae8b8cea9e88cbcfd799" "checksum awc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f4644ced3f67333ac73fd92db3206e0cef66b8880d0f6d1fbacab68f7c451ff" @@ -3766,6 +3966,7 @@ dependencies = [ "checksum brotli2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0cb036c3eade309815c15ddbacec5b22c4d1f3983a774ab2eac2e3e9ea85568e" "checksum bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0de79cfb98e7aa9988188784d8664b4b5dad6eaaa0863b91d9a4ed871d4f7a42" "checksum build_const 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39" +"checksum bumpalo 2.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "84dca3afd8e01b9526818b7963e5b4916063b3cdf9f10cf6b73ef0bd0ec37aa5" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" "checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" "checksum byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a019b10a2a7cdeb292db131fc8113e57ea2a908f6e7894b0c3c671893b65dbeb" @@ -3790,9 +3991,11 @@ dependencies = [ "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +"checksum crossbeam 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ad4c7ea749d9fb09e23c5cb17e3b70650860553a0e2744e38446b1803bf7db94" "checksum crossbeam 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b14492071ca110999a20bf90e3833406d5d66bfd93b4e52ec9539025ff43fe0d" "checksum crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0ed1a4de2235cabda8558ff5840bffb97fcb64c97827f354a451307df5f72b" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" +"checksum crossbeam-deque 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "05e44b8cf3e1a625844d1750e1f7820da46044ff6d28f4d43e455ba3e5bb2c13" "checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" "checksum crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04c9e3102cc2d69cd681412141b390abd55a362afc1540965dad0ad4d34280b4" @@ -3852,7 +4055,7 @@ dependencies = [ "checksum futures-select-macro-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "afee2644abc7c8a6529530bb20e044ace4b7dfc4df1c114614d1b458fc29f0b0" "checksum futures-sink-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "49dcfdacd6b5974ca0b9b78bc38ffd1071da0206179735c3df82e279f5b784e4" "checksum futures-timer 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5cedfe9b6dc756220782cc1ba5bcb1fa091cdcba155e40d3556159c3db58043" -"checksum futures-timer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e58e133cfe51af48fb24eb9db5c38bc9a970f47264569297bb83c55ea11cd118" +"checksum futures-timer 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb4a32e84935678650944c6ebd0d912db46405d37bf94f1a058435c5080abcb1" "checksum futures-util-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)" = "f7a0451b9c5047c2b9ab93425ffd0793165511e93c04b977cd45fbd41c6e34b2" "checksum generic-array 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3c0f28c2f5bfb5960175af447a2da7c18900693738343dc896ffbcabd9839592" "checksum generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "fceb69994e330afed50c93524be68c42fa898c2d9fd4ee8da03bd7363acd26f2" @@ -3880,7 +4083,9 @@ dependencies = [ "checksum ipconfig 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "08f7eadeaf4b52700de180d147c4805f199854600b36faa963d91114827b2ffc" "checksum itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b8467d9c1cebe26feb08c640139247fac215782d35371ade9a2136ed6085358" "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f" +"checksum js-sys 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "9987e7c13a91d9cf0efe59cca48a3a7a70e2b11695d5a4640f85ae71e28f5e73" "checksum json 0.11.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9ad0485404155f45cce53a40d4b2d6ac356418300daed05273d9e26f91c390be" +"checksum juliex 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7ba5bc8e8d05cc0980ea90e146d2ffc0ad9634281ebb48e4edee411251184663" "checksum keccak 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "67c21572b4949434e4fc1e1978b99c5f77064153c59d998bf13ecd96fb5ecba7" "checksum keccak-hash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "253bbe643c32c816bf58fa5a88248fafedeebb139705ad17a62add3517854a86" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" @@ -3982,6 +4187,12 @@ dependencies = [ "checksum ripemd160 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ad5112e0dbbb87577bfbc56c42450235e3012ce336e29c5befd7807bd626da4a" "checksum rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "16d1effe9845d54f90e7be8420ee49e5c94623140b97ee4bc6fb5bfddb745720" "checksum rocksdb 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d29e12aab379a49bfbca337132440be73d1de6f328d5635641c2b28ac9dfe514" +"checksum romio 0.3.0-alpha.8 (registry+https://github.com/rust-lang/crates.io-index)" = "e45fc890683557502157efb9479163ad7af2ea4bce850cbd1218926d89c13018" +"checksum runtime 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0a5bc0b95e096ced67025933b8cf3cffc0b6283d5c5591b5e8f2bcbdd2d69186" +"checksum runtime-attributes 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "6da2562eb0f314b0494a81fcb4a741357683ba538b9f769167925a3953df2c2b" +"checksum runtime-native 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "acf5520e4f3d366498a968d746c276eee06d9c5fd6c553bd858437e052454976" +"checksum runtime-raw 0.3.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "134702e4a37a0b4386c07f97ca309cdefa213b4dfb3eeeee51aa28d3dbfa718d" +"checksum runtime-tokio 0.3.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "defc20478ee986be244a5b71a2796d0e392becffc6441c22297d32808de62428" "checksum rustc-demangle 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "ccc78bfd5acd7bf3e89cffcf899e5cb1a52d6fafa8dec2739ad70c9577a57288" "checksum rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "403bb3a286107a04825a5f82e1270acc1e14028d3d554d7a1e08914549575ab8" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" @@ -4074,6 +4285,12 @@ dependencies = [ "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" "checksum want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "797464475f30ddb8830cc529aaaae648d581f99e2036a928877dfde027ddf6b3" +"checksum wasm-bindgen 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)" = "b7ccc7b93cfd13e26700a9e2e41e6305f1951b87e166599069f77d10358100e6" +"checksum wasm-bindgen-backend 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)" = "1953f91b1608eb1522513623c7739f047bb0fed4128ce51a93f08e12cc314645" +"checksum wasm-bindgen-futures 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fa1af11c73eca3dc8c51c76ea475a4416e912da6402064a49fc6c0214701866d" +"checksum wasm-bindgen-macro 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)" = "0f69da5696545d7ca6607a2e4b1a0edf5a6b36b2c49dbb0f1df6ad1d92884047" +"checksum wasm-bindgen-macro-support 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)" = "2d4246f3bc73223bbb846f4f2430a60725826a96c9389adf715ed1d5af46dec6" +"checksum wasm-bindgen-shared 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)" = "c08381e07e7a79e5e229ad7c60d15833d19033542cc5dd91d085df59d235f4a6" "checksum which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b57acb10231b9493c8472b20cb57317d0679a49e0bdbee44b3b803a6473af164" "checksum widestring 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7157704c2e12e3d2189c507b7482c52820a16dfa4465ba91add92f266667cadb" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" diff --git a/Cargo.toml b/Cargo.toml index 28555dd88..a96b53ea7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,14 +25,15 @@ core-network = { path = "core/network" } rayon = "1.0" hex = "0.3" -tokio = "0.1" clap = "2.32" log = "0.4" num_cpus = "1.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -futures-preview = { version = "0.3.0-alpha.16", features = [ "compat" ] } +futures-preview = "0.3.0-alpha.16" +runtime = "0.3.0-alpha.4" +runtime-tokio = "0.3.0-alpha.4" [workspace] members = [ diff --git a/components/transaction-pool/Cargo.toml b/components/transaction-pool/Cargo.toml index 109b43eea..ec9d62550 100644 --- a/components/transaction-pool/Cargo.toml +++ b/components/transaction-pool/Cargo.toml @@ -19,9 +19,11 @@ parking_lot = "0.8" log = "0.4" futures-preview = "0.3.0-alpha.16" rayon = "1.0" +runtime = "0.3.0-alpha.4" [dev-dependencies] components-database = { path = "../database" } hex = "0.3" uuid = { version = "0.7", features = ["serde", "v4"] } chashmap = "2.2" +runtime-tokio = "0.3.0-alpha.4" diff --git a/components/transaction-pool/src/lib.rs b/components/transaction-pool/src/lib.rs index 55ea61751..1e0ec755e 100644 --- a/components/transaction-pool/src/lib.rs +++ b/components/transaction-pool/src/lib.rs @@ -7,6 +7,8 @@ use std::string::ToString; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use futures::{future::ready, prelude::StreamExt}; + use common_channel::{unbounded, Receiver, Sender}; use core_context::{Context, ORIGIN}; use core_crypto::Crypto; @@ -52,7 +54,7 @@ where let (cache_broadcast_sender, cache_broadcast_receiver) = unbounded(); let network2 = network.clone(); - std::thread::spawn(move || cache_broadcast_txs(network2, cache_broadcast_receiver)); + runtime::spawn(cache_broadcast_txs(network2, cache_broadcast_receiver)); HashTransactionPool { pool_size, @@ -290,10 +292,12 @@ fn internal_error(e: impl ToString) -> TransactionPoolError { // TODO: If the number of transactions does not satisfy "CACHE_BROOADCAST_LEN", // does it need to set up a timed broadcast? -fn cache_broadcast_txs(network: N, receiver: Receiver) { +async fn cache_broadcast_txs(network: N, receiver: Receiver) { let mut buffer_txs: Vec = Vec::with_capacity(CACHE_BROOADCAST_LEN); - loop { + let push_may_broadcast = move |tx: SignedTransaction| { + buffer_txs.push(tx); + if buffer_txs.len() >= CACHE_BROOADCAST_LEN { let mut temp = Vec::with_capacity(CACHE_BROOADCAST_LEN); mem::swap(&mut buffer_txs, &mut temp); @@ -301,13 +305,11 @@ fn cache_broadcast_txs(network: N, receiver: Receiver { - buffer_txs.push(tx); - } - Err(e) => log::error!("cache broadcast receiver {:?}", e), - } - } + ready(()) + }; + + receiver.for_each(push_may_broadcast).await; + log::error!("component: [tx_pool]: cache broadcast channel disconnected"); } #[cfg(test)] @@ -358,8 +360,8 @@ mod tests { } } - #[test] - fn test_insert_transaction() { + #[runtime::test] + async fn test_insert_transaction() { let ctx = Context::new(); let pool_size = 1000; let until_block_limit = 100; @@ -377,7 +379,7 @@ mod tests { // test normal let untx = mock_transaction(100, height + until_block_limit, "test_normal".to_owned()); - let signed_tx = block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap(); + let signed_tx = tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap(); assert_eq!( signed_tx.hash, Into::::into(untx) @@ -388,7 +390,7 @@ mod tests { // test lt valid_until_block let untx = mock_transaction(100, height, "test_lt_quota_limit".to_owned()); - let result = block_on(tx_pool.insert(ctx.clone(), untx)); + let result = tx_pool.insert(ctx.clone(), untx).await; assert_eq!(result, Err(TransactionPoolError::InvalidUntilBlock)); // test gt valid_until_block @@ -397,7 +399,7 @@ mod tests { height + until_block_limit * 2, "test_gt_valid_until_block".to_owned(), ); - let result = block_on(tx_pool.insert(ctx.clone(), untx)); + let result = tx_pool.insert(ctx.clone(), untx).await; assert_eq!(result, Err(TransactionPoolError::InvalidUntilBlock)); // test gt quota limit @@ -406,19 +408,19 @@ mod tests { height + until_block_limit, "test_gt_quota_limit".to_owned(), ); - let result = block_on(tx_pool.insert(ctx.clone(), untx)); + let result = tx_pool.insert(ctx.clone(), untx).await; assert_eq!(result, Err(TransactionPoolError::QuotaNotEnough)); // test cache dup let untx = mock_transaction(100, height + until_block_limit, "test_dup".to_owned()); let untx2 = untx.clone(); - block_on(tx_pool.insert(ctx.clone(), untx)).unwrap(); - let result = block_on(tx_pool.insert(ctx.clone(), untx2)); + tx_pool.insert(ctx.clone(), untx).await.unwrap(); + let result = tx_pool.insert(ctx.clone(), untx2).await; assert_eq!(result, Err(TransactionPoolError::Dup)); } - #[test] - fn test_histories_dup() { + #[runtime::test] + async fn test_histories_dup() { let ctx = Context::new(); let pool_size = 1000; let until_block_limit = 100; @@ -436,9 +438,12 @@ mod tests { let mut block = Block::default(); block.header.height = height; - block_on(storage.insert_transactions(ctx.clone(), vec![signed_tx.clone()])).unwrap(); + storage + .insert_transactions(ctx.clone(), vec![signed_tx.clone()]) + .await + .unwrap(); - block_on(storage.insert_block(ctx.clone(), block)).unwrap(); + storage.insert_block(ctx.clone(), block).await.unwrap(); let tx_pool = new_test_pool( ctx.clone(), @@ -449,12 +454,13 @@ mod tests { height, ); - let result = block_on(tx_pool.insert(ctx.clone(), signed_tx.untx)); + let result = tx_pool.insert(ctx.clone(), signed_tx.untx).await; assert_eq!(result, Err(TransactionPoolError::Dup)); } - #[test] - fn test_pool_size() { + // NOTE: only tokio can pass this test + #[runtime::test(runtime_tokio::Tokio)] + async fn test_pool_size() { let ctx = Context::new(); let pool_size = 1; let until_block_limit = 100; @@ -471,7 +477,7 @@ mod tests { ); let untx = mock_transaction(100, height + until_block_limit, "test1".to_owned()); - let signed_tx = block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap(); + let signed_tx = tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap(); assert_eq!( signed_tx.hash, Into::::into(untx) @@ -481,12 +487,13 @@ mod tests { ); let untx = mock_transaction(100, height + until_block_limit, "test2".to_owned()); - let result = block_on(tx_pool.insert(ctx.clone(), untx)); + let result = tx_pool.insert(ctx.clone(), untx).await; assert_eq!(result, Err(TransactionPoolError::ReachLimit)); } - #[test] - fn test_package_transaction_count() { + // NOTE: only tokio can pass this test + #[runtime::test(runtime_tokio::Tokio)] + async fn test_package_transaction_count() { let ctx = Context::new(); let pool_size = 100; let until_block_limit = 100; @@ -510,11 +517,13 @@ mod tests { .unwrap() .hash(); tx_hashes.push(tx_hash.clone()); - block_on(tx_pool.insert(ctx.clone(), untx.clone())).unwrap(); + tx_pool.insert(ctx.clone(), untx.clone()).await.unwrap(); } - let pachage_tx_hashes = - block_on(tx_pool.package(ctx, tx_hashes.len() as u64, quota_limit)).unwrap(); + let pachage_tx_hashes = tx_pool + .package(ctx, tx_hashes.len() as u64, quota_limit) + .await + .unwrap(); assert_eq!(tx_hashes.len(), pachage_tx_hashes.len()); assert_eq!( tx_hashes @@ -524,8 +533,8 @@ mod tests { ); } - #[test] - fn test_flush() { + #[runtime::test] + async fn test_flush() { let ctx = Context::new(); let pool_size = 1000; let until_block_limit = 100; @@ -567,7 +576,10 @@ mod tests { .iter() .map(|stx| stx.hash.clone()) .collect::>(); - let stxs = block_on(tx_pool.get_batch(ctx.clone(), test_hashes.as_slice())).unwrap(); + let stxs = tx_pool + .get_batch(ctx.clone(), test_hashes.as_slice()) + .await + .unwrap(); assert_eq!(stxs.len(), test_hashes.len()); assert_eq!(tx_pool.callback_cache.len(), test_hashes.len()); @@ -575,13 +587,17 @@ mod tests { .iter() .map(|stx| stx.hash.clone()) .collect::>(); - block_on(tx_pool.flush(ctx.clone(), test_hashes.as_slice())).unwrap(); + tx_pool + .flush(ctx.clone(), test_hashes.as_slice()) + .await + .unwrap(); assert_eq!(tx_pool.callback_cache.len(), 0); assert_eq!(tx_pool.tx_cache.len(), 0); } - #[test] - fn test_package_transaction_quota_limit() { + // NOTE: only tokio can pass this test + #[runtime::test(runtime_tokio::Tokio)] + async fn test_package_transaction_quota_limit() { let ctx = Context::new(); let pool_size = 100; let until_block_limit = 100; @@ -605,16 +621,18 @@ mod tests { .unwrap() .hash(); tx_hashes.push(tx_hash.clone()); - block_on(tx_pool.insert(ctx.clone(), untx)).unwrap(); + tx_pool.insert(ctx.clone(), untx).await.unwrap(); } - let pachage_tx_hashes = - block_on(tx_pool.package(ctx, tx_hashes.len() as u64, quota_limit)).unwrap(); + let pachage_tx_hashes = tx_pool + .package(ctx, tx_hashes.len() as u64, quota_limit) + .await + .unwrap(); assert_eq!(8, pachage_tx_hashes.len()); } - #[test] - fn test_ensure_partial_unknown_hashes() { + #[runtime::test] + async fn test_ensure_partial_unknown_hashes() { let ctx = Context::new(); let pool_size = 1000; let until_block_limit = 100; @@ -642,10 +660,13 @@ mod tests { untxs.push(untx); } - block_on(tx_pool.insert(ctx.clone(), untxs[0].clone())).unwrap(); + tx_pool.insert(ctx.clone(), untxs[0].clone()).await.unwrap(); assert_eq!(tx_pool.tx_cache.len(), 1); - block_on(tx_pool.ensure(ctx.clone(), tx_hashes.as_slice())).unwrap(); + tx_pool + .ensure(ctx.clone(), tx_hashes.as_slice()) + .await + .unwrap(); let callback_cache = tx_pool.callback_cache; dbg!(callback_cache.len()); @@ -656,8 +677,8 @@ mod tests { } } - #[test] - fn test_ensure_full_known_hashes() { + #[runtime::test] + async fn test_ensure_full_known_hashes() { let ctx = Context::new(); let pool_size = 1000; let until_block_limit = 100; @@ -683,16 +704,20 @@ mod tests { .unwrap() .hash(), ); - block_on(tx_pool.insert(ctx.clone(), untx)).unwrap(); + tx_pool.insert(ctx.clone(), untx).await.unwrap(); } assert_eq!(tx_pool.tx_cache.len(), 5); - block_on(tx_pool.ensure(ctx.clone(), tx_hashes.as_slice())).unwrap(); + tx_pool + .ensure(ctx.clone(), tx_hashes.as_slice()) + .await + .unwrap(); assert_eq!(tx_pool.callback_cache.len(), 0); } - #[test] - fn test_broadcast_txs() { + // NOTE: use tokio runtime to speed up test + #[runtime::test(runtime_tokio::Tokio)] + async fn test_broadcast_txs() { let ctx = Context::new().with_value(ORIGIN, TransactionOrigin::Jsonrpc); let pool_size = 1000; let until_block_limit = 100; @@ -718,9 +743,9 @@ mod tests { let untx = mock_transaction(100, height + until_block_limit, format!("test{}", i)); if i == CACHE_BROOADCAST_LEN { - block_on(tx_pool.insert(Context::new(), untx)).unwrap(); + tx_pool.insert(Context::new(), untx).await.unwrap(); } else { - block_on(tx_pool.insert(ctx.clone(), untx)).unwrap(); + tx_pool.insert(ctx.clone(), untx).await.unwrap(); } } diff --git a/core/consensus/src/synchronizer.rs b/core/consensus/src/synchronizer.rs index fc3e47525..b4e8c7cc2 100644 --- a/core/consensus/src/synchronizer.rs +++ b/core/consensus/src/synchronizer.rs @@ -3,8 +3,7 @@ use std::time::Duration; use futures::compat::Stream01CompatExt; use futures::prelude::{StreamExt, TryFutureExt, TryStreamExt}; -use futures::{executor::block_on, future::ready, stream::select}; -// TODO: tokio timer doens't work on block_on +use futures::{future::ready, stream::select}; use futures_timer::Interval; use log; @@ -32,7 +31,7 @@ where } } - pub fn start(&self, mut sub_block: Receiver) { + pub async fn start(self, mut sub_block: Receiver) { let synchronizer = Arc::clone(&self.synchronizer); let storage = Arc::clone(&self.storage); @@ -47,20 +46,17 @@ where }) .map(std::result::Result::ok); - std::thread::spawn(move || { - block_on( - select(sub_block.boxed(), interval_broadcaster.boxed()) - .filter_map(ready) - .for_each(move |block| { - let status = SyncStatus { - hash: block.hash, - height: block.header.height, - }; - log::debug!("broadcast status: {:?}", &status); - synchronizer.broadcast_status(status); - ready(()) - }), - ); - }); + select(sub_block.boxed(), interval_broadcaster.boxed()) + .filter_map(ready) + .for_each(move |block| { + let status = SyncStatus { + hash: block.hash, + height: block.header.height, + }; + log::debug!("broadcast status: {:?}", &status); + synchronizer.broadcast_status(status); + ready(()) + }) + .await; } } diff --git a/core/network/Cargo.toml b/core/network/Cargo.toml index e6cc68524..8f5016e41 100644 --- a/core/network/Cargo.toml +++ b/core/network/Cargo.toml @@ -21,6 +21,5 @@ serde_derive = "1.0" serde = "1.0" rand = "0.6" parking_lot = "0.8" -# FIXME: wait 0.2? https://github.com/tokio-rs/tokio/pull/1097 -tokio = "0.1" -futures-timer = "0.2.0" +runtime = "0.3.0-alpha.4" +futures-timer = "0.2" diff --git a/core/network/src/inbound.rs b/core/network/src/inbound.rs index 0704306c5..9b0f3cc8b 100644 --- a/core/network/src/inbound.rs +++ b/core/network/src/inbound.rs @@ -13,7 +13,7 @@ use std::pin::Pin; use std::sync::Arc; use futures::future::{ready, BoxFuture}; -use futures::prelude::{FutureExt, Stream, TryFutureExt}; +use futures::prelude::{FutureExt, Stream}; use futures::task::{Context as TaskContext, Poll}; use log::{error, info}; @@ -142,7 +142,7 @@ impl Stream for InboundHandle { } Poll::Ready(Some(session_msg)) => { let job = self.handle_msg(session_msg); - tokio::spawn(job.unit_error().boxed().compat()); + runtime::spawn(job); Poll::Ready(Some(())) } @@ -174,7 +174,6 @@ mod tests { use std::io::ErrorKind; use std::sync::Arc; - use futures::executor::block_on; use futures::prelude::StreamExt; use common_channel::{bounded, Sender}; @@ -312,38 +311,38 @@ mod tests { } } - #[test] - fn test_handle_body() { + #[runtime::test] + async fn test_handle_body() { let reactors = mock_reactors(); let bytes = encode_bytes(&b"cici hunter".to_vec(), Method::Vote); let ctx = Context::new(); - let maybe_ok = block_on(InboundHandle::handle_body(reactors, ctx, bytes)); + let maybe_ok = InboundHandle::handle_body(reactors, ctx, bytes).await; assert_eq!(maybe_ok.unwrap(), ()); } - #[test] - fn test_handle_body_with_bad_data() { + #[runtime::test] + async fn test_handle_body_with_bad_data() { let reactors = mock_reactors(); let bytes = Bytes::from(b"bad bytes".to_vec()); let ctx = Context::new(); - match block_on(InboundHandle::handle_body(reactors, ctx, bytes)) { + match InboundHandle::handle_body(reactors, ctx, bytes).await { Err(Error::MsgCodecError(NetMessageError::DecodeError(_))) => (), // pass _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_handle_body_with_wrong_size_data() { + #[runtime::test] + async fn test_handle_body_with_wrong_size_data() { let reactors = mock_reactors(); let bytes = mock_wrong_size_data(); let ctx = Context::new(); - match block_on(InboundHandle::handle_body(reactors, ctx, bytes)) { + match InboundHandle::handle_body(reactors, ctx, bytes).await { Err(Error::IoError(err)) => { assert_eq!(err.kind(), ErrorKind::UnexpectedEof); assert!(format!("{:?}", err.get_ref()).contains("data corruption")); @@ -352,71 +351,71 @@ mod tests { } } - #[test] - fn test_handle_body_with_wrong_method_data() { + #[runtime::test] + async fn test_handle_body_with_wrong_method_data() { let reactors = mock_reactors(); let bytes = mock_wrong_method_data(); let ctx = Context::new(); - match block_on(InboundHandle::handle_body(reactors, ctx, bytes)) { + match InboundHandle::handle_body(reactors, ctx, bytes).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, 9_999_999), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_handle_body_with_unknown_method() { + #[runtime::test] + async fn test_handle_body_with_unknown_method() { let reactors = mock_reactors(); let bytes = encode_bytes(&b"hameihameiha".to_vec(), Method::SyncPullTxs); let ctx = Context::new(); - match block_on(InboundHandle::handle_body(reactors, ctx, bytes)) { + match InboundHandle::handle_body(reactors, ctx, bytes).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, Method::SyncPullTxs.to_u32()), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_handle_body_with_reactor_error() { + #[runtime::test] + async fn test_handle_body_with_reactor_error() { let reactors = mock_reactors(); let bytes = encode_bytes(&b"13".to_vec(), Method::PullTxs); let ctx = Context::new(); - match block_on(InboundHandle::handle_body(reactors, ctx, bytes)) { + match InboundHandle::handle_body(reactors, ctx, bytes).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, 9_999_999), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_handle_msg() { + #[runtime::test] + async fn test_handle_msg() { let (inbound, _) = mock_inbound(); let bytes = encode_bytes(&b"cici hunter".to_vec(), Method::Vote); let sess_msg = mock_sess_msg(bytes); - assert_eq!(block_on(inbound.handle_msg(sess_msg)), ()); + assert_eq!(inbound.handle_msg(sess_msg).await, ()); } // TODO: test logger - #[test] - fn test_handle_msg_with_bad_data() { + #[runtime::test] + async fn test_handle_msg_with_bad_data() { let (inbound, _) = mock_inbound(); let bytes = Bytes::from(b"bad bytes".to_vec()); let sess_msg = mock_sess_msg(bytes); - assert_eq!(block_on(inbound.handle_msg(sess_msg)), ()); + assert_eq!(inbound.handle_msg(sess_msg).await, ()); } - #[test] - fn test_inbound_stop() { + #[runtime::test] + async fn test_inbound_stop() { let (mut inbound, tx) = mock_inbound(); drop(tx); - assert_eq!(block_on(inbound.next()), None); + assert_eq!(inbound.next().await, None); } } diff --git a/core/network/src/inbound/consensus.rs b/core/network/src/inbound/consensus.rs index f9e4ae161..5f30db9eb 100644 --- a/core/network/src/inbound/consensus.rs +++ b/core/network/src/inbound/consensus.rs @@ -96,7 +96,6 @@ mod tests { use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; - use futures::executor::block_on; use futures::future::{err, ok}; use core_context::Context; @@ -160,21 +159,21 @@ mod tests { ConsensusReactor::new(cons) } - #[test] - fn test_react_with_unknown_method() { + #[runtime::test] + async fn test_react_with_unknown_method() { let reactor = new_cons_reactor(); let ctx = Context::new(); let method = Method::SyncPullTxs; let data = b"software from".to_vec(); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, method.to_u32()), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_react_proposal() { + #[runtime::test] + async fn test_react_proposal() { let reactor = new_cons_reactor(); let proposal = Proposal::from(b"fish man?".to_vec()); @@ -182,26 +181,26 @@ mod tests { let ctx = Context::new(); let method = Method::Proposal; - let maybe_ok = block_on(reactor.react(ctx, method, data)); + let maybe_ok = reactor.react(ctx, method, data).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.consensus.count(), 1); } - #[test] - fn test_react_proposal_with_bad_data() { + #[runtime::test] + async fn test_react_proposal_with_bad_data() { let reactor = new_cons_reactor(); let ctx = Context::new(); let method = Method::Proposal; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_proposal_with_consensus_faiure() { + #[runtime::test] + async fn test_react_proposal_with_consensus_faiure() { let reactor = new_cons_reactor(); let ctx = Context::new(); let method = Method::Proposal; @@ -210,7 +209,7 @@ mod tests { let data = ::encode(&proposal).unwrap().to_vec(); reactor.consensus.reply_err(true); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::ConsensusError(ConsensusError::Internal(str))) => { assert!(str.contains("mock error")) } @@ -218,8 +217,8 @@ mod tests { } } - #[test] - fn test_react_vote() { + #[runtime::test] + async fn test_react_vote() { let reactor = new_cons_reactor(); let vote = Vote::from(b"7ff ch 2".to_vec()); @@ -227,26 +226,26 @@ mod tests { let ctx = Context::new(); let method = Method::Vote; - let maybe_ok = block_on(reactor.react(ctx, method, data)); + let maybe_ok = reactor.react(ctx, method, data).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.consensus.count(), 1); } - #[test] - fn test_react_vote_with_bad_data() { + #[runtime::test] + async fn test_react_vote_with_bad_data() { let reactor = new_cons_reactor(); let ctx = Context::new(); let method = Method::Vote; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_vote_with_consensus_faiure() { + #[runtime::test] + async fn test_react_vote_with_consensus_faiure() { let reactor = new_cons_reactor(); let ctx = Context::new(); let method = Method::Vote; @@ -255,7 +254,7 @@ mod tests { let data = ::encode(&vote).unwrap().to_vec(); reactor.consensus.reply_err(true); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::ConsensusError(ConsensusError::Internal(str))) => { assert!(str.contains("mock error")) } diff --git a/core/network/src/inbound/sync.rs b/core/network/src/inbound/sync.rs index b4f439b9e..01a7ac42d 100644 --- a/core/network/src/inbound/sync.rs +++ b/core/network/src/inbound/sync.rs @@ -197,8 +197,6 @@ mod tests { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; - use futures::executor::block_on; - use core_context::{Context, P2P_SESSION_ID}; use core_network_message::common::{PullTxs, PushTxs}; use core_network_message::sync::{BroadcastStatus, PullBlocks, PushBlocks}; @@ -309,45 +307,45 @@ mod tests { SyncReactor::new(synchronizer, callback, outbound) } - #[test] - fn test_react_with_unknown_method() { + #[runtime::test] + async fn test_react_with_unknown_method() { let reactor = new_sync_reactor(); let ctx = Context::new(); let method = Method::Vote; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, method.to_u32()), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_react_broadcast_status() { + #[runtime::test] + async fn test_react_broadcast_status() { let reactor = new_sync_reactor(); let status = BroadcastStatus::from(Hash::default(), 20); let data = ::encode(&status).unwrap(); let ctx = Context::new(); let method = Method::SyncBroadcastStatus; - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()) } - #[test] - fn test_react_broadcast_status_with_bad_data() { + #[runtime::test] + async fn test_react_broadcast_status_with_bad_data() { let reactor = new_sync_reactor(); let ctx = Context::new(); let method = Method::SyncBroadcastStatus; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_broadcast_status_with_sync_failure() { + #[runtime::test] + async fn test_react_broadcast_status_with_sync_failure() { let reactor = new_sync_reactor(); let status = BroadcastStatus::from(Hash::default(), 20); let data = ::encode(&status).unwrap(); @@ -356,7 +354,7 @@ mod tests { let method = Method::SyncBroadcastStatus; reactor.sync.reply_err(true); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SynchronizerError(SynchronizerError::Internal(str))) => { assert!(str.contains("mock error")) } @@ -364,8 +362,8 @@ mod tests { } } - #[test] - fn test_react_pull_blocks() { + #[runtime::test] + async fn test_react_pull_blocks() { let reactor = new_sync_reactor(); let pull_blocks = PullBlocks::from(1, vec![1, 2]); let data = ::encode(&pull_blocks).unwrap(); @@ -378,14 +376,14 @@ mod tests { ::encode(&push_blocks, Method::SyncPushBlocks).unwrap(); let scope = Scope::Single(SessionId::new(1)); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.outbound.broadcasted_data(), Some((bytes, scope))); } - #[test] - fn test_react_pull_blocks_without_session_id() { + #[runtime::test] + async fn test_react_pull_blocks_without_session_id() { let reactor = new_sync_reactor(); let pull_blocks = PullBlocks::from(1, vec![1, 2]); let data = ::encode(&pull_blocks).unwrap(); @@ -393,26 +391,26 @@ mod tests { let ctx = Context::new(); let method = Method::SyncPullBlocks; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_react_pull_blocks_with_bad_data() { + #[runtime::test] + async fn test_react_pull_blocks_with_bad_data() { let reactor = new_sync_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPullBlocks; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_pull_blocks_with_sync_failure() { + #[runtime::test] + async fn test_react_pull_blocks_with_sync_failure() { let reactor = new_sync_reactor(); let pull_blocks = PullBlocks::from(1, vec![1, 2]); let data = ::encode(&pull_blocks).unwrap(); @@ -421,7 +419,7 @@ mod tests { let method = Method::SyncPullBlocks; reactor.sync.reply_err(true); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SynchronizerError(SynchronizerError::Internal(str))) => { assert!(str.contains("mock error")) } @@ -429,8 +427,8 @@ mod tests { } } - #[test] - fn test_react_pull_blocks_with_broadcast_failure() { + #[runtime::test] + async fn test_react_pull_blocks_with_broadcast_failure() { let reactor = new_sync_reactor(); let pull_blocks = PullBlocks::from(1, vec![1, 2]); let data = ::encode(&pull_blocks).unwrap(); @@ -439,13 +437,13 @@ mod tests { let method = Method::SyncPullBlocks; reactor.outbound.reply_err(true); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.outbound.broadcasted_data(), None); } - #[test] - fn test_react_push_blocks() { + #[runtime::test] + async fn test_react_push_blocks() { let reactor = new_sync_reactor(); let push_blocks = PushBlocks::from(1, vec![Block::default()]); let data = ::encode(&push_blocks).unwrap(); @@ -454,15 +452,15 @@ mod tests { let method = Method::SyncPushBlocks; let rx = reactor.callback.insert::>(1, 1); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); let blocks = rx.try_recv().unwrap(); assert_eq!(blocks.len(), 1); } - #[test] - fn test_react_push_blocks_without_session_id() { + #[runtime::test] + async fn test_react_push_blocks_without_session_id() { let reactor = new_sync_reactor(); let push_blocks = PushBlocks::from(1, vec![Block::default()]); let data = ::encode(&push_blocks).unwrap(); @@ -470,26 +468,26 @@ mod tests { let ctx = Context::new(); let method = Method::SyncPushBlocks; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_react_push_blocks_with_bad_data() { + #[runtime::test] + async fn test_react_push_blocks_with_bad_data() { let reactor = new_sync_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPushBlocks; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_push_blocks_without_cb_tx() { + #[runtime::test] + async fn test_react_push_blocks_without_cb_tx() { let reactor = new_sync_reactor(); let push_blocks = PushBlocks::from(1, vec![Block::default()]); let data = ::encode(&push_blocks).unwrap(); @@ -497,14 +495,14 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPushBlocks; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::CallbackItemNotFound(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemNotFound"), } } - #[test] - fn test_react_push_blocks_with_wrong_cb_tx() { + #[runtime::test] + async fn test_react_push_blocks_with_wrong_cb_tx() { let reactor = new_sync_reactor(); let push_blocks = PushBlocks::from(1, vec![Block::default()]); let data = ::encode(&push_blocks).unwrap(); @@ -513,14 +511,14 @@ mod tests { let method = Method::SyncPushBlocks; let _rx = reactor.callback.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::CallbackItemWrongType(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemWrongType"), } } - #[test] - fn test_react_push_blocks_with_bad_ser_block() { + #[runtime::test] + async fn test_react_push_blocks_with_bad_ser_block() { let reactor = new_sync_reactor(); let mut ser_block = SerBlock::default(); ser_block.header = None; @@ -535,14 +533,14 @@ mod tests { let method = Method::SyncPushBlocks; let _rx = reactor.callback.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_react_push_blocks_with_tx_failure() { + #[runtime::test] + async fn test_react_push_blocks_with_tx_failure() { let reactor = new_sync_reactor(); let push_blocks = PushBlocks::from(1, vec![Block::default()]); let data = ::encode(&push_blocks).unwrap(); @@ -552,14 +550,14 @@ mod tests { let rx = reactor.callback.insert::>(1, 1); drop(rx); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::ChannelTrySendError(_)) => (), _ => panic!("should return Error::ChannelTrySendError"), } } - #[test] - fn test_react_pull_txs() { + #[runtime::test] + async fn test_react_pull_txs() { let reactor = new_sync_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default()]); let data = ::encode(&pull_txs).unwrap(); @@ -571,37 +569,37 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPullTxs; - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.outbound.broadcasted_data(), Some((bytes, scope))); } - #[test] - fn test_pull_txs_without_session_id() { + #[runtime::test] + async fn test_pull_txs_without_session_id() { let reactor = new_sync_reactor(); let ctx = Context::new(); - match block_on(reactor.react(ctx, Method::SyncPullTxs, vec![1, 2, 3])) { + match reactor.react(ctx, Method::SyncPullTxs, vec![1, 2, 3]).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_pull_txs_with_bad_data() { + #[runtime::test] + async fn test_pull_txs_with_bad_data() { let reactor = new_sync_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); - match block_on(reactor.react(ctx, Method::SyncPullTxs, vec![1, 2, 3])) { + match reactor.react(ctx, Method::SyncPullTxs, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_pull_txs_with_bad_ser_hash() { + #[runtime::test] + async fn test_pull_txs_with_bad_ser_hash() { let reactor = new_sync_reactor(); let pull_txs = PullTxs { uid: 1, @@ -612,14 +610,14 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPullTxs; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_pull_txs_with_sync_get_txs_failure() { + #[runtime::test] + async fn test_pull_txs_with_sync_get_txs_failure() { let reactor = new_sync_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default()]); let data = ::encode(&pull_txs).unwrap(); @@ -628,7 +626,7 @@ mod tests { let method = Method::SyncPullTxs; reactor.sync.reply_err(true); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SynchronizerError(SynchronizerError::Internal(str))) => { assert!(str.contains("mock error")) } @@ -636,8 +634,8 @@ mod tests { } } - #[test] - fn test_pull_txs_with_outbound_failure() { + #[runtime::test] + async fn test_pull_txs_with_outbound_failure() { let reactor = new_sync_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default()]); let data = ::encode(&pull_txs).unwrap(); @@ -646,14 +644,14 @@ mod tests { let method = Method::SyncPullTxs; reactor.outbound.reply_err(true); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.outbound.broadcasted_data(), None); } - #[test] - fn test_react_push_txs() { + #[runtime::test] + async fn test_react_push_txs() { let reactor = new_sync_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap(); @@ -662,35 +660,35 @@ mod tests { let method = Method::SyncPushTxs; let rx = reactor.callback.insert::>(1, 2); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(rx.try_recv().unwrap(), vec![SignedTransaction::default()]); } - #[test] - fn test_react_push_txs_without_session_id() { + #[runtime::test] + async fn test_react_push_txs_without_session_id() { let reactor = new_sync_reactor(); let ctx = Context::new(); - match block_on(reactor.react(ctx, Method::SyncPushTxs, vec![1, 2])) { + match reactor.react(ctx, Method::SyncPushTxs, vec![1, 2]).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_react_push_txs_with_bad_data() { + #[runtime::test] + async fn test_react_push_txs_with_bad_data() { let reactor = new_sync_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); - match block_on(reactor.react(ctx, Method::SyncPushTxs, vec![1, 2])) { + match reactor.react(ctx, Method::SyncPushTxs, vec![1, 2]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_push_txs_without_cb_tx() { + #[runtime::test] + async fn test_react_push_txs_without_cb_tx() { let reactor = new_sync_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap(); @@ -698,14 +696,14 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::SyncPushTxs; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::CallbackItemNotFound(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemNotFound"), } } - #[test] - fn test_react_push_txs_with_wrong_cb_type() { + #[runtime::test] + async fn test_react_push_txs_with_wrong_cb_type() { let reactor = new_sync_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap(); @@ -714,14 +712,14 @@ mod tests { let method = Method::SyncPushTxs; let _rx = reactor.callback.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::CallbackItemWrongType(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemWrongType"), } } - #[test] - fn test_react_push_txs_with_bad_ser_stxs() { + #[runtime::test] + async fn test_react_push_txs_with_bad_ser_stxs() { let reactor = new_sync_reactor(); let mut ser_stx = SerSignedTransaction::default(); ser_stx.untx = None; @@ -736,14 +734,14 @@ mod tests { let method = Method::SyncPushTxs; let _rx = reactor.callback.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_react_push_txs_with_cb_tx_failure() { + #[runtime::test] + async fn test_react_push_txs_with_cb_tx_failure() { let reactor = new_sync_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap(); @@ -753,7 +751,7 @@ mod tests { let rx = reactor.callback.insert::>(1, 1); drop(rx); - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::ChannelTrySendError(_)) => (), _ => panic!("should return Error::ChannelTrySendError"), } diff --git a/core/network/src/inbound/tx_pool.rs b/core/network/src/inbound/tx_pool.rs index ed4414ff4..56f6822f6 100644 --- a/core/network/src/inbound/tx_pool.rs +++ b/core/network/src/inbound/tx_pool.rs @@ -170,7 +170,6 @@ mod tests { use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; - use futures::executor::block_on; use futures::future::{err, ok}; use core_context::{Context, P2P_SESSION_ID}; @@ -300,8 +299,8 @@ mod tests { (reactor, cb) } - #[test] - fn test_react_with_unknown_method() { + #[runtime::test] + async fn test_react_with_unknown_method() { let (reactor, _) = new_tx_pool_reactor(); let stxs = vec![SignedTransaction::default(), SignedTransaction::default()]; @@ -311,14 +310,14 @@ mod tests { let ctx = Context::new(); let method = Method::SyncPullTxs; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::UnknownMethod(m)) => assert_eq!(m, method.to_u32()), _ => panic!("should return Error::UnknownMethod"), } } - #[test] - fn test_react_broadcast_txs() { + #[runtime::test] + async fn test_react_broadcast_txs() { let (reactor, _) = new_tx_pool_reactor(); let stxs = vec![SignedTransaction::default(), SignedTransaction::default()]; let broadcast_txs = BroadcastTxs::from(stxs); @@ -326,27 +325,27 @@ mod tests { let ctx = Context::new(); let method = Method::BroadcastTxs; - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.tx_pool.count(), 2); } - #[test] - fn test_react_broadcast_txs_with_bad_data() { + #[runtime::test] + async fn test_react_broadcast_txs_with_bad_data() { let (reactor, _) = new_tx_pool_reactor(); let ctx = Context::new(); let method = Method::BroadcastTxs; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_broadcast_txs_with_des_failure() { + #[runtime::test] + async fn test_react_broadcast_txs_with_des_failure() { let (reactor, _) = new_tx_pool_reactor(); let mut ser_stx = SerSignedTransaction::default(); ser_stx.untx = None; @@ -357,14 +356,14 @@ mod tests { let ctx = Context::new(); let method = Method::BroadcastTxs; - match block_on(reactor.react(ctx, method, data.to_vec())) { + match reactor.react(ctx, method, data.to_vec()).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_react_broadcast_txs_with_insertion_failure() { + #[runtime::test] + async fn test_react_broadcast_txs_with_insertion_failure() { let (reactor, _) = new_tx_pool_reactor(); let broadcast_txs = BroadcastTxs::from(vec![SignedTransaction::default()]); let data = ::encode(&broadcast_txs).unwrap(); @@ -373,19 +372,22 @@ mod tests { let method = Method::BroadcastTxs; reactor.tx_pool.reply_err(true); - let maybe_ok = block_on(reactor.react(ctx, method, data.to_vec())); + let maybe_ok = reactor.react(ctx, method, data.to_vec()).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.tx_pool.count(), 0); } - #[test] - fn test_react_pull_txs() { + #[runtime::test] + async fn test_react_pull_txs() { let (reactor, _) = new_tx_pool_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default(), Hash::default()]); let data = ::encode(&pull_txs).unwrap().to_vec(); - let stxs = block_on(reactor.tx_pool.get_batch(Context::new(), &[])).unwrap(); + let stxs = { + let maybe_stxs = reactor.tx_pool.get_batch(Context::new(), &[]).await; + maybe_stxs.unwrap() + }; let push_txs = PushTxs::from(1, stxs); let bytes = ::encode(&push_txs, Method::PushTxs).unwrap(); @@ -393,15 +395,15 @@ mod tests { let scope = Scope::Single(SessionId::new(1)); let method = Method::PullTxs; - let maybe_ok = block_on(reactor.react(ctx, method, data)); + let maybe_ok = reactor.react(ctx, method, data).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.tx_pool.count(), 2); assert_eq!(reactor.outbound.broadcasted_data(), Some((bytes, scope))); } - #[test] - fn test_react_pull_txs_without_session_id() { + #[runtime::test] + async fn test_react_pull_txs_without_session_id() { let (reactor, _) = new_tx_pool_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default(), Hash::default()]); let data = ::encode(&pull_txs).unwrap().to_vec(); @@ -409,26 +411,26 @@ mod tests { let ctx = Context::new(); let method = Method::PullTxs; - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_react_pull_txs_with_bad_data() { + #[runtime::test] + async fn test_react_pull_txs_with_bad_data() { let (reactor, _) = new_tx_pool_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::PullTxs; - match block_on(reactor.react(ctx, method, vec![1, 2, 3, 4])) { + match reactor.react(ctx, method, vec![1, 2, 3, 4]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_pull_txs_with_bad_hash() { + #[runtime::test] + async fn test_react_pull_txs_with_bad_hash() { let (reactor, _) = new_tx_pool_reactor(); let pull_txs = PullTxs { uid: 1, @@ -439,14 +441,14 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::PullTxs; - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_react_pull_txs_with_get_batch_failure() { + #[runtime::test] + async fn test_react_pull_txs_with_get_batch_failure() { let (reactor, _) = new_tx_pool_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default(), Hash::default()]); let data = ::encode(&pull_txs).unwrap().to_vec(); @@ -455,14 +457,14 @@ mod tests { let method = Method::PullTxs; reactor.tx_pool.reply_err(true); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::TransactionPoolError(TransactionPoolError::TransactionNotFound)) => (), _ => panic!("should return Error::TransactionPoolError"), } } - #[test] - fn test_react_pull_txs_with_broadcast_failure() { + #[runtime::test] + async fn test_react_pull_txs_with_broadcast_failure() { let (reactor, _) = new_tx_pool_reactor(); let pull_txs = PullTxs::from(1, vec![Hash::default(), Hash::default()]); let data = ::encode(&pull_txs).unwrap().to_vec(); @@ -471,13 +473,13 @@ mod tests { let method = Method::PullTxs; reactor.outbound.reply_err(true); - let maybe_ok = block_on(reactor.react(ctx, method, data)); + let maybe_ok = reactor.react(ctx, method, data).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(reactor.outbound.broadcasted_data(), None); } - #[test] - fn test_react_push_txs() { + #[runtime::test] + async fn test_react_push_txs() { let (reactor, cb) = new_tx_pool_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap().to_vec(); @@ -486,14 +488,14 @@ mod tests { let rx = cb.insert::>(1, 1); let method = Method::PushTxs; - let maybe_ok = block_on(reactor.react(ctx, method, data)); + let maybe_ok = reactor.react(ctx, method, data).await; assert_eq!(maybe_ok.unwrap(), ()); assert_eq!(rx.try_recv().unwrap(), vec![SignedTransaction::default()]); } - #[test] - fn test_react_push_txs_without_session_id() { + #[runtime::test] + async fn test_react_push_txs_without_session_id() { let (reactor, _) = new_tx_pool_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap().to_vec(); @@ -501,27 +503,27 @@ mod tests { let ctx = Context::new(); let method = Method::PushTxs; - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::SessionIdNotFound) => (), _ => panic!("should return Error::SessionIdNotFound"), } } - #[test] - fn test_react_push_txs_with_bad_data() { + #[runtime::test] + async fn test_react_push_txs_with_bad_data() { let (reactor, _) = new_tx_pool_reactor(); let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::PushTxs; - match block_on(reactor.react(ctx, method, vec![1, 2, 3])) { + match reactor.react(ctx, method, vec![1, 2, 3]).await { Err(Error::MsgCodecError(_)) => (), _ => panic!("should return Error::MsgCodecError"), } } - #[test] - fn test_react_push_txs_with_cb_item_not_found() { + #[runtime::test] + async fn test_react_push_txs_with_cb_item_not_found() { let (reactor, _) = new_tx_pool_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap().to_vec(); @@ -529,14 +531,14 @@ mod tests { let ctx = Context::new().with_value(P2P_SESSION_ID, 1); let method = Method::PushTxs; - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::CallbackItemNotFound(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemNotFound"), } } - #[test] - fn test_react_push_txs_with_cb_item_wrong_type() { + #[runtime::test] + async fn test_react_push_txs_with_cb_item_wrong_type() { let (reactor, cb) = new_tx_pool_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap().to_vec(); @@ -545,14 +547,14 @@ mod tests { let method = Method::PushTxs; let _rx = cb.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::CallbackItemWrongType(id)) => assert_eq!(id, 1), _ => panic!("should return Error::CallbackItemWrongType"), } } - #[test] - fn test_react_push_txs_with_des_failure() { + #[runtime::test] + async fn test_react_push_txs_with_des_failure() { let (reactor, cb) = new_tx_pool_reactor(); let mut ser_stx = SerSignedTransaction::default(); ser_stx.untx = None; @@ -567,14 +569,14 @@ mod tests { let method = Method::PushTxs; let _rx = cb.insert::>(1, 1); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::SerCodecError(_)) => (), _ => panic!("should return Error::SerCodecError"), } } - #[test] - fn test_react_push_txs_with_try_send_failure() { + #[runtime::test] + async fn test_react_push_txs_with_try_send_failure() { let (reactor, cb) = new_tx_pool_reactor(); let push_txs = PushTxs::from(1, vec![SignedTransaction::default()]); let data = ::encode(&push_txs).unwrap().to_vec(); @@ -584,7 +586,7 @@ mod tests { let rx = cb.insert::>(1, 1); drop(rx); - match block_on(reactor.react(ctx, method, data)) { + match reactor.react(ctx, method, data).await { Err(Error::ChannelTrySendError(_)) => (), _ => panic!("should return Error::ChannelTrySendError"), } diff --git a/core/network/src/outbound.rs b/core/network/src/outbound.rs index e972e9765..851938347 100644 --- a/core/network/src/outbound.rs +++ b/core/network/src/outbound.rs @@ -447,8 +447,8 @@ mod tests { assert_eq!(outbound.broadcaster.broadcasted_bytes(), None); } - #[test] - fn test_rpc() { + #[runtime::test] + async fn test_rpc() { let data = b"son of sun".to_vec(); let method = Method::Proposal; let msg_bytes = encode_bytes(&data, method); @@ -458,10 +458,11 @@ mod tests { .with_value(P2P_SESSION_ID, 1) .with_value::(CALL_ID_KEY, CallId::new(1)); let (outbound, done_tx) = new_outbound::>(); + let method = Method::Proposal; let expect_resp = b"you".to_vec(); done_tx.try_send(expect_resp.clone()).unwrap(); - let resp: Vec = block_on(outbound.clone().rpc(ctx, Method::Proposal, data)).unwrap(); + let resp: Vec = outbound.clone().rpc(ctx, method, data).await.unwrap(); assert_eq!(resp, expect_resp); assert_eq!( @@ -476,65 +477,70 @@ mod tests { let data = b"fish same".to_vec(); let ctx = Context::new(); let (outbound, _) = new_outbound::<()>(); + let method = Method::Proposal; - block_on(outbound.rpc::, Vec>(ctx, Method::Proposal, data)).unwrap(); + block_on(outbound.rpc::, Vec>(ctx, method, data)).unwrap(); } - #[test] - fn test_rpc_without_ctx_session_id() { + #[runtime::test] + async fn test_rpc_without_ctx_session_id() { let data = b"shell out of ghost".to_vec(); let ctx = Context::new().with_value::(CALL_ID_KEY, CallId::new(1)); + let method = Method::Proposal; let (outbound, _) = new_outbound::<()>(); - match block_on(outbound.rpc::, Vec>(ctx, Method::Proposal, data)) { + match outbound.rpc::, Vec>(ctx, method, data).await { Err(str) => assert!(str.contains("session id not found")), _ => panic!("should return error string contains 'session id not found'"), } } - #[test] - fn test_rpc_with_broadcast_failure() { + #[runtime::test] + async fn test_rpc_with_broadcast_failure() { let data = b"taqikema".to_vec(); let ctx = Context::new() .with_value(P2P_SESSION_ID, 1) .with_value::(CALL_ID_KEY, CallId::new(1)); + let method = Method::Vote; let (outbound, _) = new_outbound::<()>(); outbound.broadcaster.reply_err(true); - match block_on(outbound.rpc::, Vec>(ctx, Method::Vote, data)) { + match outbound.rpc::, Vec>(ctx, method, data).await { Err(str) => assert!(str.contains("mock err")), _ => panic!("should return error string contains 'mock err'"), } } - #[test] - fn test_rpc_with_disconnected_done_channel() { + #[runtime::test] + async fn test_rpc_with_disconnected_done_channel() { let data = b"taqikema".to_vec(); let ctx = Context::new() .with_value(P2P_SESSION_ID, 1) .with_value::(CALL_ID_KEY, CallId::new(1)); + let method = Method::Vote; let (outbound, done_tx) = new_outbound::>(); drop(done_tx); - match block_on(outbound.rpc::, Vec>(ctx, Method::Vote, data)) { + match outbound.rpc::, Vec>(ctx, method, data).await { Err(str) => assert!(str.contains("done_rx return None")), _ => panic!("should return error string contains 'done_rx return None'"), } } - #[test] - fn test_rpc_timeout() { + #[runtime::test] + async fn test_rpc_timeout() { let data = b"death strand".to_vec(); let ctx = Context::new() .with_value(P2P_SESSION_ID, 1) .with_value::(CALL_ID_KEY, CallId::new(1)); + let method = Method::Vote; // hold _done_tx, but do not send anything let (outbound, _done_tx) = new_outbound::>(); - match block_on(outbound.rpc::, Vec>(ctx, Method::Vote, data)) { + match outbound.rpc::, Vec>(ctx, method, data).await { Err(str) => assert!(str.contains("timeout")), _ => panic!("should return error string contains 'timeout'"), } diff --git a/core/network/src/outbound/sync.rs b/core/network/src/outbound/sync.rs index eaf2c51cd..b9fc8da33 100644 --- a/core/network/src/outbound/sync.rs +++ b/core/network/src/outbound/sync.rs @@ -48,8 +48,6 @@ where #[cfg(test)] mod tests { - use futures::executor::block_on; - use core_context::{Context, P2P_SESSION_ID}; use core_network_message::sync::{BroadcastStatus, PullBlocks}; use core_network_message::{common::PullTxs, Method}; @@ -60,8 +58,8 @@ mod tests { use crate::outbound::Mode; use crate::p2p::{Scope, SessionId}; - #[test] - fn test_broadcast_status() { + #[runtime::test] + async fn test_broadcast_status() { let status = SyncStatus { hash: Hash::default(), height: 2020, @@ -80,8 +78,8 @@ mod tests { ); } - #[test] - fn test_broadcast_status_but_fail() { + #[runtime::test] + async fn test_broadcast_status_but_fail() { let status = SyncStatus { hash: Hash::default(), height: 2020, @@ -94,8 +92,8 @@ mod tests { assert_eq!(outbound.broadcaster.broadcasted_bytes(), None); } - #[test] - fn test_pull_blocks() { + #[runtime::test] + async fn test_pull_blocks() { let heights = vec![2020, 2021]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes( @@ -112,7 +110,7 @@ mod tests { let (outbound, done_tx) = new_outbound::>(); done_tx.try_send(expect_resp.clone()).unwrap(); - let resp = block_on(outbound.pull_blocks(ctx, heights)).unwrap(); + let resp = outbound.pull_blocks(ctx, heights).await.unwrap(); assert_eq!(resp.first().unwrap().header.height, 2020); assert_eq!( outbound.broadcaster.broadcasted_bytes(), @@ -120,48 +118,48 @@ mod tests { ); } - #[test] - fn test_pull_blocks_without_session_id() { + #[runtime::test] + async fn test_pull_blocks_without_session_id() { let heights = vec![2099]; let ctx = Context::new(); let (outbound, _) = new_outbound::<()>(); - match block_on(outbound.pull_blocks(ctx, heights)) { + match outbound.pull_blocks(ctx, heights).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("session id not found")), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_blocks_but_broadcast_fail() { + #[runtime::test] + async fn test_pull_blocks_but_broadcast_fail() { let heights = vec![2099]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, _) = new_outbound::<()>(); outbound.broadcaster.reply_err(true); - match block_on(outbound.pull_blocks(ctx, heights)) { + match outbound.pull_blocks(ctx, heights).await { Err(SynchronizerError::Internal(_)) => (), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_blocks_with_disconnected_done_tx() { + #[runtime::test] + async fn test_pull_blocks_with_disconnected_done_tx() { let heights = vec![2099]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, done_tx) = new_outbound::>(); drop(done_tx); - match block_on(outbound.pull_blocks(ctx, heights)) { + match outbound.pull_blocks(ctx, heights).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("done_rx return None")), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_blocks_timeout() { + #[runtime::test] + async fn test_pull_blocks_timeout() { let heights = vec![2077]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes( @@ -172,7 +170,7 @@ mod tests { let (outbound, _done_tx) = new_outbound::>(); - match block_on(outbound.pull_blocks(ctx, heights)) { + match outbound.pull_blocks(ctx, heights).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("timeout")), _ => panic!("should return SynchronizerError::Internal indicates timeout"), } @@ -182,8 +180,8 @@ mod tests { ); } - #[test] - fn test_pull_txs() { + #[runtime::test] + async fn test_pull_txs() { let hashes = vec![Hash::default(), Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes(&PullTxs::from(1, hashes.clone()), Method::SyncPullTxs); @@ -193,7 +191,7 @@ mod tests { let (outbound, done_tx) = new_outbound::>(); done_tx.try_send(expect_resp.clone()).unwrap(); - let resp = block_on(outbound.pull_txs(ctx, hashes.as_slice())).unwrap(); + let resp = outbound.pull_txs(ctx, hashes.as_slice()).await.unwrap(); assert_eq!(resp, expect_resp); assert_eq!( outbound.broadcaster.broadcasted_bytes(), @@ -201,48 +199,48 @@ mod tests { ); } - #[test] - fn test_pull_txs_without_session_id() { + #[runtime::test] + async fn test_pull_txs_without_session_id() { let hashes = vec![Hash::default()]; let ctx = Context::new(); let (outbound, _) = new_outbound::<()>(); - match block_on(outbound.pull_txs(ctx, hashes.as_slice())) { + match outbound.pull_txs(ctx, hashes.as_slice()).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("session id not found")), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_txs_but_broadcast_fail() { + #[runtime::test] + async fn test_pull_txs_but_broadcast_fail() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, _) = new_outbound::<()>(); outbound.broadcaster.reply_err(true); - match block_on(outbound.pull_txs(ctx, hashes.as_slice())) { + match outbound.pull_txs(ctx, hashes.as_slice()).await { Err(SynchronizerError::Internal(_)) => (), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_txs_with_disconnected_done_tx() { + #[runtime::test] + async fn test_pull_txs_with_disconnected_done_tx() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, done_tx) = new_outbound::>(); drop(done_tx); - match block_on(outbound.pull_txs(ctx, hashes.as_slice())) { + match outbound.pull_txs(ctx, hashes.as_slice()).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("done_rx return None")), _ => panic!("should return SynchronizerError::Internal"), } } - #[test] - fn test_pull_txs_timeout() { + #[runtime::test] + async fn test_pull_txs_timeout() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes(&PullTxs::from(1, hashes.clone()), Method::SyncPullTxs); @@ -250,7 +248,7 @@ mod tests { let (outbound, _done_tx) = new_outbound::>(); - match block_on(outbound.pull_txs(ctx, hashes.as_slice())) { + match outbound.pull_txs(ctx, hashes.as_slice()).await { Err(SynchronizerError::Internal(str)) => assert!(str.contains("timeout")), _ => panic!("should return SynchronizerError::Internal indicates timeout"), } diff --git a/core/network/src/outbound/tx_pool.rs b/core/network/src/outbound/tx_pool.rs index 6788288ea..169f5168f 100644 --- a/core/network/src/outbound/tx_pool.rs +++ b/core/network/src/outbound/tx_pool.rs @@ -36,8 +36,6 @@ where #[cfg(test)] mod tests { - use futures::executor::block_on; - use core_context::{Context, P2P_SESSION_ID}; use core_network_message::{common::PullTxs, tx_pool::BroadcastTxs, Method}; use core_runtime::{network::TransactionPool, TransactionPoolError}; @@ -72,8 +70,8 @@ mod tests { assert_eq!(outbound.broadcaster.broadcasted_bytes(), None); } - #[test] - fn test_pull_txs() { + #[runtime::test] + async fn test_pull_txs() { let hashes = vec![Hash::default(), Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes(&PullTxs::from(1, hashes.clone()), Method::PullTxs); @@ -83,7 +81,7 @@ mod tests { let (outbound, done_tx) = new_outbound::>(); done_tx.try_send(expect_resp.clone()).unwrap(); - let resp = block_on(outbound.pull_txs(ctx, hashes)).unwrap(); + let resp = outbound.pull_txs(ctx, hashes).await.unwrap(); assert_eq!(resp, expect_resp); assert_eq!( outbound.broadcaster.broadcasted_bytes(), @@ -91,13 +89,13 @@ mod tests { ); } - #[test] - fn test_pull_txs_without_session_id() { + #[runtime::test] + async fn test_pull_txs_without_session_id() { let hashes = vec![Hash::default()]; let ctx = Context::new(); let (outbound, _) = new_outbound::<()>(); - match block_on(outbound.pull_txs(ctx, hashes)) { + match outbound.pull_txs(ctx, hashes).await { Err(TransactionPoolError::Internal(str)) => { assert!(str.contains("session id not found")) } @@ -105,29 +103,29 @@ mod tests { } } - #[test] - fn test_pull_txs_but_broadcast_fail() { + #[runtime::test] + async fn test_pull_txs_but_broadcast_fail() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, _) = new_outbound::<()>(); outbound.broadcaster.reply_err(true); - match block_on(outbound.pull_txs(ctx, hashes)) { + match outbound.pull_txs(ctx, hashes).await { Err(TransactionPoolError::Internal(_)) => (), _ => panic!("should return TransactionPoolError::Internal"), } } - #[test] - fn test_pull_txs_with_disconnected_done_tx() { + #[runtime::test] + async fn test_pull_txs_with_disconnected_done_tx() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let (outbound, done_tx) = new_outbound::>(); drop(done_tx); - match block_on(outbound.pull_txs(ctx, hashes)) { + match outbound.pull_txs(ctx, hashes).await { Err(TransactionPoolError::Internal(str)) => { assert!(str.contains("done_rx return None")) } @@ -135,8 +133,8 @@ mod tests { } } - #[test] - fn test_pull_txs_timeout() { + #[runtime::test] + async fn test_pull_txs_timeout() { let hashes = vec![Hash::default()]; let ctx = Context::new().with_value(P2P_SESSION_ID, 1usize); let bytes = encode_bytes(&PullTxs::from(1, hashes.clone()), Method::PullTxs); @@ -144,7 +142,7 @@ mod tests { let (outbound, _done_tx) = new_outbound::>(); - match block_on(outbound.pull_txs(ctx, hashes)) { + match outbound.pull_txs(ctx, hashes).await { Err(TransactionPoolError::Internal(str)) => assert!(str.contains("timeout")), _ => panic!("should return TransactionPoolError::Internal indicates timeout"), } diff --git a/core/network/src/peer_manager.rs b/core/network/src/peer_manager.rs index 31f58b823..8d18ca344 100644 --- a/core/network/src/peer_manager.rs +++ b/core/network/src/peer_manager.rs @@ -3,15 +3,14 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use futures::compat::Stream01CompatExt; use futures::future::{ready, FutureObj}; -use futures::prelude::{FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::prelude::{Stream, StreamExt}; use futures::task::{AtomicWaker, Context, Poll}; +use futures_timer::Interval; use log::{debug, error}; use parking_lot::RwLock; use rand::seq::SliceRandom; use std::collections::HashSet; -use tokio::timer::Interval; use crate::p2p::{multiaddr::Multiaddr, DialProtocol, Dialer}; @@ -80,15 +79,14 @@ impl DefaultPeerManager { self.dialer = Some(dialer); let job = async move { - let routine_job = Interval::new_interval(Duration::from_secs(routine_interval)) - .compat() - .for_each(move |_| { + let routine_job = + Interval::new(Duration::from_secs(routine_interval)).for_each(move |_| { waker.wake(); ready(()) }); - tokio::spawn(routine_job.unit_error().boxed().compat()); + runtime::spawn(routine_job); self.for_each(async move |_| ()).await; }; diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 012d920e3..cc23c02c1 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::{marker::Unpin, pin::Pin}; -use futures::prelude::{FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::prelude::{Stream, StreamExt}; use futures::task::{Context as FutTaskContext, Poll}; use log::error; @@ -153,31 +153,13 @@ where pub async fn run(mut self) { // TODO: remove unwrap let conn_pool = self.conn_pool.take().unwrap(); - tokio::spawn( - conn_pool - .for_each(async move |_| ()) - .unit_error() - .boxed() - .compat(), - ); + runtime::spawn(conn_pool.for_each(async move |_| ())); let inbound = self.inbound.take().unwrap(); - tokio::spawn( - inbound - .for_each(async move |_| ()) - .unit_error() - .boxed() - .compat(), - ); + runtime::spawn(inbound.for_each(async move |_| ())); let peer_mgr = self.peer_mgr.clone(); - tokio::spawn( - peer_mgr - .run(self.dialer.clone(), PEER_MANAGER_ROUTINE_INTERVAL) - .unit_error() - .boxed() - .compat(), - ); + runtime::spawn(peer_mgr.run(self.dialer.clone(), PEER_MANAGER_ROUTINE_INTERVAL)); self.for_each(async move |_| ()).await } diff --git a/core/pubsub/Cargo.toml b/core/pubsub/Cargo.toml index ab5d6cb17..896d025b1 100644 --- a/core/pubsub/Cargo.toml +++ b/core/pubsub/Cargo.toml @@ -8,3 +8,4 @@ edition = "2018" futures-preview = { version = "0.3.0-alpha.16", features = [ "async-await", "nightly" ] } uuid = { version = "0.7", features = [ "v4" ] } log = "0.4" +runtime = "0.3.0-alpha.4" diff --git a/core/pubsub/README.md b/core/pubsub/README.md index 11e18a371..4e4de9885 100644 --- a/core/pubsub/README.md +++ b/core/pubsub/README.md @@ -20,7 +20,7 @@ broadcast::Receiver => broadcast::Sender => pubsub::Receiver => Generic Message ### Usage -To use this crate, you need to start with a Rust 2018 edition crate, with rustc 1.33.0-nightly or later. +To use this crate, you need to start with a Rust 2018 edition crate, with rustc 1.35.0-nightly or later. Add this to your `Cargo.toml`: @@ -38,9 +38,6 @@ Then, get started. In your application, add: // The nightly features that are commonly needed with async/await #![feature(async_await)] -use std::thread::spawn; - -use futures::executor::block_on; use futures::future::ready; use futures::prelude::StreamExt; @@ -49,10 +46,11 @@ use core_pubsub::PubSub; #[derive(Clone, Debug)] struct Message { header: String, - body: String, + body: String, } -pub fn main() -> Result<(), ()> { +#[runtime::main] +pub async fn main() -> Result<(), ()> { let mut pubsub = PubSub::builder().build().start(); let mut sub = pubsub.subscribe::("test".to_owned())?; @@ -63,11 +61,11 @@ pub fn main() -> Result<(), ()> { let mut register = pubsub.register(); let mut pubb = register.publish::("test".to_owned())?; - let _test_pubb = spawn(move || { + let _test_pubb = runtime::spawn(async move { let mut count = 1; let msg = Message { header: "dummy".to_owned(), - body: "hello world".to_owned(), + body: "hello world".to_owned(), }; for _ in 0..15 { @@ -79,15 +77,16 @@ pub fn main() -> Result<(), ()> { } }); - block_on(sub.take(5).for_each(|e| { - println!("{:?}", e); - ready(()) - })); + sub.take(5) + .for_each(|e| { + println!("{:?}", e); + ready(()) + }) + .await; - if let Err(err) = pubsub.shutdown() { + if let Err(err) = pubsub.shutdown().await { eprintln!("shutdown failure: {:?}", err); } Ok(()) -} ``` diff --git a/core/pubsub/examples/pubsub_show.rs b/core/pubsub/examples/pubsub_show.rs index 7fe4c568f..b3674993b 100644 --- a/core/pubsub/examples/pubsub_show.rs +++ b/core/pubsub/examples/pubsub_show.rs @@ -1,6 +1,5 @@ -use std::thread::spawn; +#![feature(async_await)] -use futures::executor::block_on; use futures::future::ready; use futures::prelude::StreamExt; @@ -12,7 +11,8 @@ struct Message { body: String, } -pub fn main() -> Result<(), ()> { +#[runtime::main] +pub async fn main() -> Result<(), ()> { let mut pubsub = PubSub::builder().build().start(); let mut sub = pubsub.subscribe::("test".to_owned())?; @@ -23,7 +23,7 @@ pub fn main() -> Result<(), ()> { let mut register = pubsub.register(); let mut pubb = register.publish::("test".to_owned())?; - let _test_pubb = spawn(move || { + let _test_pubb = runtime::spawn(async move { let mut count = 1; let msg = Message { header: "dummy".to_owned(), @@ -39,12 +39,14 @@ pub fn main() -> Result<(), ()> { } }); - block_on(sub.take(5).for_each(|e| { - println!("{:?}", e); - ready(()) - })); + sub.take(5) + .for_each(|e| { + println!("{:?}", e); + ready(()) + }) + .await; - if let Err(err) = pubsub.shutdown() { + if let Err(err) = pubsub.shutdown().await { eprintln!("shutdown failure: {:?}", err); } diff --git a/core/pubsub/src/broadcast.rs b/core/pubsub/src/broadcast.rs index 84d83b676..7aa7ac503 100644 --- a/core/pubsub/src/broadcast.rs +++ b/core/pubsub/src/broadcast.rs @@ -134,11 +134,11 @@ impl Broadcast { #[cfg(test)] mod tests { use std::sync::Arc; - use std::thread::{spawn, JoinHandle}; use futures::channel::{mpsc, oneshot}; use futures::executor::block_on; use futures::prelude::StreamExt; + use runtime::task::JoinHandle; use uuid::Uuid; use crate::broadcast::mock_hash_map::{State, StateRx}; @@ -183,9 +183,9 @@ mod tests { } impl Control { - pub fn shutdown(self) { + pub async fn shutdown(self) { self.shutdown_tx.send(()).unwrap(); - self.handle.join().unwrap(); + self.handle.await; } pub async fn get_next_state(&mut self) -> State { @@ -240,7 +240,7 @@ mod tests { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let broadcast = Broadcast::broadcast(pubs, subs, pending_acts, act_rx, shutdown_rx); - let handle = spawn(move || block_on(broadcast)); + let handle = runtime::spawn(broadcast); Control { act_tx, @@ -251,8 +251,8 @@ mod tests { } } - #[test] - fn test_new_pub() { + #[runtime::test] + async fn test_new_pub() { let mut ctrl = new_broadcast(); let topic = "test".to_owned(); @@ -265,11 +265,11 @@ mod tests { assert_eq!(topic, "test"); assert_eq!(len, 1); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_remove_pub() { + #[runtime::test] + async fn test_remove_pub() { let mut ctrl = new_broadcast(); let topic = "test"; @@ -288,11 +288,11 @@ mod tests { let (_, len) = block_on(ctrl.get_next_state()); assert_eq!(len, 0); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_new_sub() { + #[runtime::test] + async fn test_new_sub() { let mut ctrl = new_broadcast(); let topic = "test".to_owned(); @@ -309,11 +309,11 @@ mod tests { assert_eq!(sub_uuid, uuid.to_string()); assert_eq!(len, 1); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_remove_sub() { + #[runtime::test] + async fn test_remove_sub() { let mut ctrl = new_broadcast(); let topic = "test"; @@ -329,11 +329,11 @@ mod tests { let (_, len) = block_on(ctrl.get_next_state()); assert_eq!(len, 0); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_broadcast() { + #[runtime::test] + async fn test_broadcast() { let mut ctrl = new_broadcast(); let topic = "test"; let msg = "coaerl & tsuaedy".to_owned(); @@ -348,11 +348,11 @@ mod tests { assert_eq!(recv_msg, msg); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_broadcast_use_wrong_recv_msg_type() { + #[runtime::test] + async fn test_broadcast_use_wrong_recv_msg_type() { let mut ctrl = new_broadcast(); let topic = "test"; let msg = "coalre & tdusaey".to_owned(); @@ -367,15 +367,15 @@ mod tests { assert!(msg.is_none()); - ctrl.shutdown(); + ctrl.shutdown().await; } - #[test] - fn test_broadcast_shutdown() { + #[runtime::test] + async fn test_broadcast_shutdown() { let ctrl = new_broadcast(); let act_tx = ctrl.act_tx.clone(); - ctrl.shutdown(); + ctrl.shutdown().await; assert!(act_tx.is_closed()); } diff --git a/core/pubsub/src/lib.rs b/core/pubsub/src/lib.rs index 5af3ba2ee..2e8a8edd4 100644 --- a/core/pubsub/src/lib.rs +++ b/core/pubsub/src/lib.rs @@ -21,7 +21,7 @@ //! ### Usage //! //! To use this crate, you need to start with a Rust 2018 edition crate, with -//! rustc 1.33.0-nightly or later. +//! rustc 1.35.0-nightly or later. //! //! Add this to your `Cargo.toml`: //! @@ -39,9 +39,6 @@ //! // The nightly features that are commonly needed with async/await //! #![feature(async_await)] //! -//! use std::thread::spawn; -//! -//! use futures::executor::block_on; //! use futures::future::ready; //! use futures::prelude::StreamExt; //! @@ -53,7 +50,8 @@ //! body: String, //! } //! -//! pub fn main() -> Result<(), ()> { +//! #[runtime::main] +//! pub async fn main() -> Result<(), ()> { //! let mut pubsub = PubSub::builder().build().start(); //! //! let mut sub = pubsub.subscribe::("test".to_owned())?; @@ -64,7 +62,7 @@ //! let mut register = pubsub.register(); //! //! let mut pubb = register.publish::("test".to_owned())?; -//! let _test_pubb = spawn(move || { +//! let _test_pubb = runtime::spawn(async move { //! let mut count = 1; //! let msg = Message { //! header: "dummy".to_owned(), @@ -80,12 +78,14 @@ //! } //! }); //! -//! block_on(sub.take(5).for_each(|e| { -//! println!("{:?}", e); -//! ready(()) -//! })); +//! sub.take(5) +//! .for_each(|e| { +//! println!("{:?}", e); +//! ready(()) +//! }) +//! .await; //! -//! if let Err(err) = pubsub.shutdown() { +//! if let Err(err) = pubsub.shutdown().await { //! eprintln!("shutdown failure: {:?}", err); //! } //! diff --git a/core/pubsub/src/pubsub.rs b/core/pubsub/src/pubsub.rs index 11ef90720..9c8fa6d6b 100644 --- a/core/pubsub/src/pubsub.rs +++ b/core/pubsub/src/pubsub.rs @@ -77,7 +77,7 @@ impl PubSub { } /// Shutdown this pubsub instance - pub fn shutdown(self) -> Result<(), ()> { - self.broadcast_worker.shutdown() + pub async fn shutdown(self) -> Result<(), ()> { + self.broadcast_worker.shutdown().await } } diff --git a/core/pubsub/src/worker.rs b/core/pubsub/src/worker.rs index d87d0ddc2..5e78ca4d1 100644 --- a/core/pubsub/src/worker.rs +++ b/core/pubsub/src/worker.rs @@ -1,10 +1,8 @@ use std::future::Future; -use std::thread::{self as thread, JoinHandle}; use futures::channel::oneshot::Sender; -use futures::executor::block_on; use futures::future::FutureObj; -use log::error; +use runtime::task::JoinHandle; type TaskFut = FutureObj<'static, ()>; @@ -44,15 +42,12 @@ impl Worker { } /// Start task in single thread - /// - /// FIXME: Use `tokio` to spawn task, `tokio::run` is unreliable right now, - /// may panic, complain that 'not yet implemented'. pub fn start_loop(self) -> Self { let shutdown_tx = self.shutdown_tx; let task = { if let Task::Idle(task) = self.task { - Task::Running(thread::spawn(move || block_on(task))) + Task::Running(runtime::spawn(task)) } else { self.task } @@ -77,12 +72,10 @@ impl Worker { (fut, ctrl) } - pub fn shutdown(self) -> Result<(), ()> { + pub async fn shutdown(self) -> Result<(), ()> { if let Task::Running(thread_handle) = self.task { self.shutdown_tx.send(())?; - thread_handle.join().map_err(|err| { - error!("Pubsub: worker thread join error: {:?}", err); - })?; + thread_handle.await; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 17262ad0f..7fd42c826 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,6 @@ use std::fs::File; use std::path::Path; use std::sync::Arc; -use futures::executor::block_on; -use futures::prelude::{FutureExt, TryFutureExt}; - use components_database::rocks::{Config as RocksDBConfig, RocksDB}; use components_executor::evm::{EVMBlockDataProvider, EVMExecutor}; use components_executor::TrieDB; @@ -31,7 +28,8 @@ use core_types::{Address, Block, BlockHeader, Genesis, Hash, Proof}; mod config; use config::Config; -fn main() { +#[runtime::main(runtime_tokio::Tokio)] +async fn main() { common_logger::init(common_logger::Flag::Main); let matches = clap::App::new("Muta") .version("0.1") @@ -58,13 +56,15 @@ fn main() { if let Some(matches) = matches.subcommand_matches("init") { let genesis_path = matches.value_of("genesis.json").unwrap(); log::info!("Genesis path: {}", genesis_path); - handle_init(&cfg, genesis_path).unwrap(); + handle_init(&cfg, genesis_path).await.unwrap(); } - start(&cfg); + start(&cfg).await; } -fn start(cfg: &Config) { +// clippy bug +#[allow(clippy::needless_lifetimes)] +async fn start(cfg: &Config) { // new context let ctx = Context::new(); @@ -83,7 +83,7 @@ fn start(cfg: &Config) { let trie_db = Arc::new(TrieDB::new(Arc::clone(&state_db))); // new executor - let block = block_on(storage.get_latest_block(ctx.clone())).unwrap(); + let block = storage.get_latest_block(ctx.clone()).await.unwrap(); let executor = Arc::new( EVMExecutor::from_existing( trie_db, @@ -130,7 +130,7 @@ fn start(cfg: &Config) { verifier_list.push(Address::from_hex(address).unwrap()); } - let proof = block_on(storage.get_latest_proof(ctx.clone())).unwrap(); + let proof = storage.get_latest_proof(ctx.clone()).await.unwrap(); let status = ConsensusStatus { height: block.header.height, timestamp: block.header.timestamp, @@ -174,7 +174,7 @@ fn start(cfg: &Config) { Arc::clone(&consensus), Arc::clone(&storage), ); - std::thread::spawn(move || tokio::run(network.run().unit_error().boxed().compat())); + runtime::spawn(network.run()); // start synchronizer let sub_block2 = pubsub @@ -185,7 +185,7 @@ fn start(cfg: &Config) { Arc::clone(&storage), cfg.synchronzer.broadcast_status_interval, ); - synchronizer_manager.start(sub_block2); + runtime::spawn(synchronizer_manager.start(sub_block2)); // start jsonrpc let sub_block = pubsub @@ -208,12 +208,19 @@ fn start(cfg: &Config) { Arc::clone(&secp), Arc::clone(&peer_count), ); - if let Err(e) = components_jsonrpc::listen(jrpc_config, jrpc_state, sub_block) { - log::error!("Failed to start jrpc server: {}", e); - }; + + let handle = std::thread::spawn(move || { + if let Err(e) = components_jsonrpc::listen(jrpc_config, jrpc_state, sub_block) { + log::error!("Failed to start jrpc server: {}", e); + }; + }); + + handle.join().unwrap(); } -fn handle_init(cfg: &Config, genesis_path: impl AsRef) -> Result<(), Box> { +// clippy bug +#[allow(clippy::needless_lifetimes)] +async fn handle_init(cfg: &Config, genesis_path: impl AsRef) -> Result<(), Box> { let mut r = File::open(genesis_path)?; let genesis: Genesis = serde_json::from_reader(&mut r)?; log::info!("Genesis data: {:?}", genesis); @@ -227,7 +234,7 @@ fn handle_init(cfg: &Config, genesis_path: impl AsRef) -> Result<(), Box) -> Result<(), Box