From 6087e4fa1e16e9a00a61aee470897335812b387c Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Sat, 16 Dec 2023 17:21:11 +0000 Subject: [PATCH] feat: added benchmarking binary for torrent repository --- Cargo.lock | 97 ++++++ Cargo.toml | 6 +- .../torrent-repository-benchmarks/Cargo.toml | 21 ++ .../torrent-repository-benchmarks/src/args.rs | 15 + .../src/benches/asyn.rs | 176 ++++++++++ .../src/benches/mod.rs | 3 + .../src/benches/sync.rs | 166 ++++++++++ .../src/benches/utils.rs | 73 +++++ .../torrent-repository-benchmarks/src/lib.rs | 2 + .../torrent-repository-benchmarks/src/main.rs | 139 ++++++++ src/core/mod.rs | 133 ++++---- src/core/services/torrent.rs | 33 +- src/core/{torrent.rs => torrent/mod.rs} | 2 + src/core/torrent/repository.rs | 301 ++++++++++++++++++ tests/wrk_benchmark_announce.lua | 64 ++-- 15 files changed, 1110 insertions(+), 121 deletions(-) create mode 100644 packages/torrent-repository-benchmarks/Cargo.toml create mode 100644 packages/torrent-repository-benchmarks/src/args.rs create mode 100644 packages/torrent-repository-benchmarks/src/benches/asyn.rs create mode 100644 packages/torrent-repository-benchmarks/src/benches/mod.rs create mode 100644 packages/torrent-repository-benchmarks/src/benches/sync.rs create mode 100644 packages/torrent-repository-benchmarks/src/benches/utils.rs create mode 100644 packages/torrent-repository-benchmarks/src/lib.rs create mode 100644 packages/torrent-repository-benchmarks/src/main.rs rename src/core/{torrent.rs => torrent/mod.rs} (99%) create mode 100644 src/core/torrent/repository.rs diff --git a/Cargo.lock b/Cargo.lock index 30a961b1..4e3bb2f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,12 +91,54 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + [[package]] name = "anstyle" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +[[package]] +name = "anstyle-parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +dependencies = [ + "windows-sys 0.48.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +dependencies = [ + "anstyle", + "windows-sys 0.48.0", +] + [[package]] name = "aquatic_udp_protocol" version = "0.8.0" @@ -160,6 +202,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -212,6 +255,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "axum-server" version = "0.5.1" @@ -515,6 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] @@ -523,8 +579,22 @@ version = "4.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.39", ] [[package]] @@ -542,6 +612,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "config" version = "0.13.4" @@ -612,6 +688,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools", "num-traits", @@ -624,6 +701,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -3253,6 +3331,17 @@ dependencies = [ "winnow", ] +[[package]] +name = "torrust-torrent-repository-benchmarks" +version = "3.0.0-alpha.12-develop" +dependencies = [ + "aquatic_udp_protocol", + "clap", + "futures", + "tokio", + "torrust-tracker", +] + [[package]] name = "torrust-tracker" version = "3.0.0-alpha.12-develop" @@ -3265,6 +3354,7 @@ dependencies = [ "binascii", "chrono", "config", + "criterion", "derive_more", "fern", "futures", @@ -3274,6 +3364,7 @@ dependencies = [ "log", "mockall", "multimap", + "once_cell", "openssl", "percent-encoding", "r2d2", @@ -3486,6 +3577,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.6.1" diff --git a/Cargo.toml b/Cargo.toml index 5f55c6c5..4f2abd6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ version = "3.0.0-alpha.12-develop" [dependencies] aquatic_udp_protocol = "0" async-trait = "0" -axum = "0.6" +axum = { version = "0.6", features = ["macros"] } axum-client-ip = "0.4" axum-server = { version = "0", features = ["tls-rustls"] } binascii = "0" @@ -68,8 +68,10 @@ tower-http = { version = "0.4", features = ["compression-full"] } uuid = { version = "1", features = ["v4"] } [dev-dependencies] +criterion = { version = "0.5.1", features = ["async_tokio"] } local-ip-address = "0" mockall = "0" +once_cell = "1.18.0" reqwest = { version = "0", features = ["json"] } serde_bytes = "0" serde_repr = "0" @@ -77,7 +79,7 @@ serde_urlencoded = "0" torrust-tracker-test-helpers = { version = "3.0.0-alpha.12-develop", path = "packages/test-helpers" } [workspace] -members = ["contrib/bencode", "packages/configuration", "packages/located-error", "packages/primitives", "packages/test-helpers"] +members = ["contrib/bencode", "packages/configuration", "packages/located-error", "packages/primitives", "packages/test-helpers", "packages/torrent-repository-benchmarks"] [profile.dev] debug = 1 diff --git a/packages/torrent-repository-benchmarks/Cargo.toml b/packages/torrent-repository-benchmarks/Cargo.toml new file mode 100644 index 00000000..da9aba62 --- /dev/null +++ b/packages/torrent-repository-benchmarks/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "torrust-torrent-repository-benchmarks" +authors.workspace = true +categories.workspace = true +description.workspace = true +documentation.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +publish.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +aquatic_udp_protocol = "0.8.0" +clap = { version = "4.4.8", features = ["derive"] } +futures = "0.3.29" +tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } +torrust-tracker = { path = "../../" } \ No newline at end of file diff --git a/packages/torrent-repository-benchmarks/src/args.rs b/packages/torrent-repository-benchmarks/src/args.rs new file mode 100644 index 00000000..3a38c55a --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/args.rs @@ -0,0 +1,15 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct Args { + /// Amount of benchmark worker threads + #[arg(short, long)] + pub threads: usize, + /// Amount of time in ns a thread will sleep to simulate a client response after handling a task + #[arg(short, long)] + pub sleep: Option, + /// Compare with old implementations of the torrent repository + #[arg(short, long)] + pub compare: Option, +} diff --git a/packages/torrent-repository-benchmarks/src/benches/asyn.rs b/packages/torrent-repository-benchmarks/src/benches/asyn.rs new file mode 100644 index 00000000..33f9e85f --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/benches/asyn.rs @@ -0,0 +1,176 @@ +use std::sync::Arc; +use std::time::Duration; + +use clap::Parser; +use futures::stream::FuturesUnordered; +use torrust_tracker::core::torrent::repository::TRepositoryAsync; +use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; + +use crate::args::Args; +use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjusted_average_from_results, DEFAULT_PEER}; + +pub async fn async_add_one_torrent(samples: usize) -> (Duration, Duration) { + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + + let info_hash = InfoHash([0; 20]); + + let start_time = std::time::Instant::now(); + + torrent_repository + .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) + .await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Add one torrent ten thousand times in parallel (depending on the set worker threads) +pub async fn async_update_one_torrent_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hash: &'static InfoHash = &InfoHash([0; 20]); + let handles = FuturesUnordered::new(); + + // Add the torrent/peer to the torrent repository + torrent_repository + .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) + .await; + + let start_time = std::time::Instant::now(); + + for _ in 0..10_000 { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone + .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) + .await; + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Add ten thousand torrents in parallel (depending on the set worker threads) +pub async fn async_add_multiple_torrents_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hashes = generate_unique_info_hashes(10_000); + let handles = FuturesUnordered::new(); + + let start_time = std::time::Instant::now(); + + for info_hash in info_hashes { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone + .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) + .await; + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Async update ten thousand torrents in parallel (depending on the set worker threads) +pub async fn async_update_multiple_torrents_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hashes = generate_unique_info_hashes(10_000); + let handles = FuturesUnordered::new(); + + // Add the torrents/peers to the torrent repository + for info_hash in &info_hashes { + torrent_repository + .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) + .await; + } + + let start_time = std::time::Instant::now(); + + for info_hash in info_hashes { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone + .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) + .await; + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} diff --git a/packages/torrent-repository-benchmarks/src/benches/mod.rs b/packages/torrent-repository-benchmarks/src/benches/mod.rs new file mode 100644 index 00000000..1026aa4b --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/benches/mod.rs @@ -0,0 +1,3 @@ +pub mod asyn; +pub mod sync; +pub mod utils; diff --git a/packages/torrent-repository-benchmarks/src/benches/sync.rs b/packages/torrent-repository-benchmarks/src/benches/sync.rs new file mode 100644 index 00000000..dac7ab81 --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/benches/sync.rs @@ -0,0 +1,166 @@ +use std::sync::Arc; +use std::time::Duration; + +use clap::Parser; +use futures::stream::FuturesUnordered; +use torrust_tracker::core::torrent::repository::Repository; +use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; + +use crate::args::Args; +use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjusted_average_from_results, DEFAULT_PEER}; + +// Simply add one torrent +#[must_use] +pub fn add_one_torrent(samples: usize) -> (Duration, Duration) { + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + + let info_hash = InfoHash([0; 20]); + + let start_time = std::time::Instant::now(); + + torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Add one torrent ten thousand times in parallel (depending on the set worker threads) +pub async fn update_one_torrent_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hash: &'static InfoHash = &InfoHash([0; 20]); + let handles = FuturesUnordered::new(); + + // Add the torrent/peer to the torrent repository + torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + + let start_time = std::time::Instant::now(); + + for _ in 0..10_000 { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Add ten thousand torrents in parallel (depending on the set worker threads) +pub async fn add_multiple_torrents_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hashes = generate_unique_info_hashes(10_000); + let handles = FuturesUnordered::new(); + + let start_time = std::time::Instant::now(); + + for info_hash in info_hashes { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} + +// Update ten thousand torrents in parallel (depending on the set worker threads) +pub async fn update_multiple_torrents_in_parallel( + runtime: &tokio::runtime::Runtime, + samples: usize, +) -> (Duration, Duration) { + let args = Args::parse(); + let mut results: Vec = Vec::with_capacity(samples); + + for _ in 0..samples { + let torrent_repository = Arc::new(T::new()); + let info_hashes = generate_unique_info_hashes(10_000); + let handles = FuturesUnordered::new(); + + // Add the torrents/peers to the torrent repository + for info_hash in &info_hashes { + torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + } + + let start_time = std::time::Instant::now(); + + for info_hash in info_hashes { + let torrent_repository_clone = torrent_repository.clone(); + + let handle = runtime.spawn(async move { + torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + + if let Some(sleep_time) = args.sleep { + let start_time = std::time::Instant::now(); + + while start_time.elapsed().as_nanos() < u128::from(sleep_time) {} + } + }); + + handles.push(handle); + } + + // Await all tasks + futures::future::join_all(handles).await; + + let result = start_time.elapsed(); + + results.push(result); + } + + get_average_and_adjusted_average_from_results(results) +} diff --git a/packages/torrent-repository-benchmarks/src/benches/utils.rs b/packages/torrent-repository-benchmarks/src/benches/utils.rs new file mode 100644 index 00000000..ef164003 --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/benches/utils.rs @@ -0,0 +1,73 @@ +use std::collections::HashSet; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; + +use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; +use torrust_tracker::core::peer::{Id, Peer}; +use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; +use torrust_tracker::shared::clock::DurationSinceUnixEpoch; + +pub const DEFAULT_PEER: Peer = Peer { + peer_id: Id([0; 20]), + peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), + updated: DurationSinceUnixEpoch::from_secs(0), + uploaded: NumberOfBytes(0), + downloaded: NumberOfBytes(0), + left: NumberOfBytes(0), + event: AnnounceEvent::Started, +}; + +#[must_use] +#[allow(clippy::missing_panics_doc)] +pub fn generate_unique_info_hashes(size: usize) -> Vec { + let mut result = HashSet::new(); + + let mut bytes = [0u8; 20]; + + #[allow(clippy::cast_possible_truncation)] + for i in 0..size { + bytes[0] = (i & 0xFF) as u8; + bytes[1] = ((i >> 8) & 0xFF) as u8; + bytes[2] = ((i >> 16) & 0xFF) as u8; + bytes[3] = ((i >> 24) & 0xFF) as u8; + + let info_hash = InfoHash(bytes); + result.insert(info_hash); + } + + assert_eq!(result.len(), size); + + result.into_iter().collect() +} + +#[must_use] +pub fn within_acceptable_range(test: &Duration, norm: &Duration) -> bool { + let test_secs = test.as_secs_f64(); + let norm_secs = norm.as_secs_f64(); + + // Calculate the upper and lower bounds for the 10% tolerance + let tolerance = norm_secs * 0.1; + + // Calculate the upper and lower limits + let upper_limit = norm_secs + tolerance; + let lower_limit = norm_secs - tolerance; + + test_secs < upper_limit && test_secs > lower_limit +} + +#[must_use] +pub fn get_average_and_adjusted_average_from_results(mut results: Vec) -> (Duration, Duration) { + #[allow(clippy::cast_possible_truncation)] + let average = results.iter().sum::() / results.len() as u32; + + results.retain(|result| within_acceptable_range(result, &average)); + + let mut adjusted_average = Duration::from_nanos(0); + + #[allow(clippy::cast_possible_truncation)] + if results.len() > 1 { + adjusted_average = results.iter().sum::() / results.len() as u32; + } + + (average, adjusted_average) +} diff --git a/packages/torrent-repository-benchmarks/src/lib.rs b/packages/torrent-repository-benchmarks/src/lib.rs new file mode 100644 index 00000000..58ebc205 --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/lib.rs @@ -0,0 +1,2 @@ +pub mod args; +pub mod benches; diff --git a/packages/torrent-repository-benchmarks/src/main.rs b/packages/torrent-repository-benchmarks/src/main.rs new file mode 100644 index 00000000..0d9db73a --- /dev/null +++ b/packages/torrent-repository-benchmarks/src/main.rs @@ -0,0 +1,139 @@ +use clap::Parser; +use torrust_torrent_repository_benchmarks::args::Args; +use torrust_torrent_repository_benchmarks::benches::asyn::{ + async_add_multiple_torrents_in_parallel, async_add_one_torrent, async_update_multiple_torrents_in_parallel, + async_update_one_torrent_in_parallel, +}; +use torrust_torrent_repository_benchmarks::benches::sync::{ + add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel, +}; +use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle}; + +#[allow(clippy::too_many_lines)] +#[allow(clippy::print_literal)] +fn main() { + let args = Args::parse(); + + // Add 1 to worker_threads since we need a thread that awaits the benchmark + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.threads + 1) + .enable_time() + .build() + .unwrap(); + + println!("tokio::sync::RwLock>"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + rt.block_on(async_add_one_torrent::(1_000_000)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(async_update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(async_add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(async_update_multiple_torrents_in_parallel::(&rt, 10)) + ); + + if let Some(true) = args.compare { + println!(); + + println!("std::sync::RwLock>"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + add_one_torrent::(1_000_000) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(update_multiple_torrents_in_parallel::(&rt, 10)) + ); + + println!(); + + println!("std::sync::RwLock>>>"); + println!("{}: Avg/AdjAvg: {:?}", "add_one_torrent", add_one_torrent::(1_000_000)); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(update_multiple_torrents_in_parallel::(&rt, 10)) + ); + + println!(); + + println!("tokio::sync::RwLock>>>"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + rt.block_on(async_add_one_torrent::(1_000_000)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(async_update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(async_add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(async_update_multiple_torrents_in_parallel::(&rt, 10)) + ); + + println!(); + + println!("tokio::sync::RwLock>>>"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + rt.block_on(async_add_one_torrent::(1_000_000)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(async_update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(async_add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(async_update_multiple_torrents_in_parallel::(&rt, 10)) + ); + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs index ed6ba6a8..caac5b1e 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -12,9 +12,9 @@ //! ```text //! Delivery layer Domain layer //! -//! HTTP tracker | +//! HTTP tracker | //! UDP tracker |> Core tracker -//! Tracker REST API | +//! Tracker REST API | //! ``` //! //! # Table of contents @@ -439,23 +439,23 @@ pub mod services; pub mod statistics; pub mod torrent; -use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::net::IpAddr; use std::panic::Location; use std::sync::Arc; use std::time::Duration; +use futures::future::join_all; use tokio::sync::mpsc::error::SendError; -use tokio::sync::{RwLock, RwLockReadGuard}; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::TrackerMode; use self::auth::Key; use self::error::Error; use self::peer::Peer; -use self::torrent::{SwarmMetadata, SwarmStats}; +use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync}; use crate::core::databases::Database; +use crate::core::torrent::{SwarmMetadata, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; /// The domain layer tracker service. @@ -472,11 +472,11 @@ pub struct Tracker { pub config: Arc, /// A database driver implementation: [`Sqlite3`](crate::core::databases::sqlite) /// or [`MySQL`](crate::core::databases::mysql) - pub database: Box, + pub database: Arc>, mode: TrackerMode, - keys: RwLock>, - whitelist: RwLock>, - torrents: RwLock>, + keys: tokio::sync::RwLock>, + whitelist: tokio::sync::RwLock>, + pub torrents: Arc, stats_event_sender: Option>, stats_repository: statistics::Repo, } @@ -562,16 +562,16 @@ impl Tracker { stats_event_sender: Option>, stats_repository: statistics::Repo, ) -> Result { - let database = databases::driver::build(&config.db_driver, &config.db_path)?; + let database = Arc::new(databases::driver::build(&config.db_driver, &config.db_path)?); let mode = config.mode; Ok(Tracker { config, mode, - keys: RwLock::new(std::collections::HashMap::new()), - whitelist: RwLock::new(std::collections::HashSet::new()), - torrents: RwLock::new(std::collections::BTreeMap::new()), + keys: tokio::sync::RwLock::new(std::collections::HashMap::new()), + whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()), + torrents: Arc::new(RepositoryAsyncSingle::new()), stats_event_sender, stats_repository, database, @@ -654,7 +654,8 @@ impl Tracker { /// It returns the data for a `scrape` response. async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { - let torrents = self.get_torrents().await; + let torrents = self.torrents.get_torrents().await; + match torrents.get(info_hash) { Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), @@ -672,7 +673,7 @@ impl Tracker { pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> { let persistent_torrents = self.database.load_persistent_torrents().await?; - let mut torrents = self.torrents.write().await; + let mut torrents = self.torrents.get_torrents_mut().await; for (info_hash, completed) in persistent_torrents { // Skip if torrent entry already exists @@ -692,7 +693,7 @@ impl Tracker { } async fn get_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec { - let read_lock = self.torrents.read().await; + let read_lock = self.torrents.get_torrents().await; match read_lock.get(info_hash) { None => vec![], @@ -704,7 +705,7 @@ impl Tracker { /// /// Get all torrent peers for a given torrent pub async fn get_all_torrent_peers(&self, info_hash: &InfoHash) -> Vec { - let read_lock = self.torrents.read().await; + let read_lock = self.torrents.get_torrents().await; match read_lock.get(info_hash) { None => vec![], @@ -721,79 +722,87 @@ impl Tracker { // code-review: consider splitting the function in two (command and query segregation). // `update_torrent_with_peer` and `get_stats` - let mut torrents = self.torrents.write().await; - - let torrent_entry = match torrents.entry(*info_hash) { - Entry::Vacant(vacant) => vacant.insert(torrent::Entry::new()), - Entry::Occupied(entry) => entry.into_mut(), - }; - - let stats_updated = torrent_entry.update_peer(peer); + let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await; - // todo: move this action to a separate worker if self.config.persistent_torrent_completed_stat && stats_updated { - drop( - self.database - .save_persistent_torrent(info_hash, torrent_entry.completed) - .await, - ); - } + let completed = stats.completed; + let info_hash = *info_hash; - let (seeders, completed, leechers) = torrent_entry.get_stats(); - - torrent::SwarmStats { - completed, - seeders, - leechers, + drop(self.database.save_persistent_torrent(&info_hash, completed).await); } - } - pub async fn get_torrents(&self) -> RwLockReadGuard<'_, BTreeMap> { - self.torrents.read().await + stats } /// It calculates and returns the general `Tracker` /// [`TorrentsMetrics`] /// /// # Context: Tracker + /// + /// # Panics + /// Panics if unable to get the torrent metrics. pub async fn get_torrents_metrics(&self) -> TorrentsMetrics { - let mut torrents_metrics = TorrentsMetrics { + let arc_torrents_metrics = Arc::new(tokio::sync::Mutex::new(TorrentsMetrics { seeders: 0, completed: 0, leechers: 0, torrents: 0, - }; + })); + + let db = self.torrents.get_torrents().await.clone(); + + let futures = db + .values() + .map(|torrent_entry| { + let torrent_entry = torrent_entry.clone(); + let torrents_metrics = arc_torrents_metrics.clone(); + + async move { + tokio::spawn(async move { + let (seeders, completed, leechers) = torrent_entry.get_stats(); + torrents_metrics.lock().await.seeders += u64::from(seeders); + torrents_metrics.lock().await.completed += u64::from(completed); + torrents_metrics.lock().await.leechers += u64::from(leechers); + torrents_metrics.lock().await.torrents += 1; + }) + .await + .expect("Error torrent_metrics spawn"); + } + }) + .collect::>(); - let db = self.get_torrents().await; + join_all(futures).await; - db.values().for_each(|torrent_entry| { - let (seeders, completed, leechers) = torrent_entry.get_stats(); - torrents_metrics.seeders += u64::from(seeders); - torrents_metrics.completed += u64::from(completed); - torrents_metrics.leechers += u64::from(leechers); - torrents_metrics.torrents += 1; - }); + let torrents_metrics = Arc::try_unwrap(arc_torrents_metrics).expect("Could not unwrap arc_torrents_metrics"); - torrents_metrics + torrents_metrics.into_inner() } /// Remove inactive peers and (optionally) peerless torrents /// /// # Context: Tracker pub async fn cleanup_torrents(&self) { - let mut torrents_lock = self.torrents.write().await; + let mut torrents_lock = self.torrents.get_torrents_mut().await; // If we don't need to remove torrents we will use the faster iter if self.config.remove_peerless_torrents { - torrents_lock.retain(|_, torrent_entry| { + let mut cleaned_torrents_map: BTreeMap = BTreeMap::new(); + + for (info_hash, torrent_entry) in &mut *torrents_lock { torrent_entry.remove_inactive_peers(self.config.max_peer_timeout); - if self.config.persistent_torrent_completed_stat { - torrent_entry.completed > 0 || !torrent_entry.peers.is_empty() - } else { - !torrent_entry.peers.is_empty() + if torrent_entry.peers.is_empty() { + continue; } - }); + + if self.config.persistent_torrent_completed_stat && torrent_entry.completed == 0 { + continue; + } + + cleaned_torrents_map.insert(*info_hash, torrent_entry.clone()); + } + + *torrents_lock = cleaned_torrents_map; } else { for torrent_entry in (*torrents_lock).values_mut() { torrent_entry.remove_inactive_peers(self.config.max_peer_timeout); @@ -1074,7 +1083,7 @@ impl Tracker { /// It return the `Tracker` [`statistics::Metrics`]. /// /// # Context: Statistics - pub async fn get_stats(&self) -> RwLockReadGuard<'_, statistics::Metrics> { + pub async fn get_stats(&self) -> tokio::sync::RwLockReadGuard<'_, statistics::Metrics> { self.stats_repository.get_stats().await } @@ -1786,11 +1795,11 @@ mod tests { assert_eq!(swarm_stats.completed, 1); // Remove the newly updated torrent from memory - tracker.torrents.write().await.remove(&info_hash); + tracker.torrents.get_torrents_mut().await.remove(&info_hash); tracker.load_torrents_from_database().await.unwrap(); - let torrents = tracker.get_torrents().await; + let torrents = tracker.torrents.get_torrents().await; assert!(torrents.contains_key(&info_hash)); let torrent_entry = torrents.get(&info_hash).unwrap(); diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index 651f40ca..918a80ba 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -93,7 +93,7 @@ impl Default for Pagination { /// It returns all the information the tracker has about one torrent in a [Info] struct. pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Option { - let db = tracker.get_torrents().await; + let db = tracker.torrents.get_torrents().await; let torrent_entry_option = db.get(info_hash); @@ -118,21 +118,22 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op /// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list. pub async fn get_torrents(tracker: Arc, pagination: &Pagination) -> Vec { - let db = tracker.get_torrents().await; - - db.iter() - .map(|(info_hash, torrent_entry)| { - let (seeders, completed, leechers) = torrent_entry.get_stats(); - BasicInfo { - info_hash: *info_hash, - seeders: u64::from(seeders), - completed: u64::from(completed), - leechers: u64::from(leechers), - } - }) - .skip(pagination.offset as usize) - .take(pagination.limit as usize) - .collect() + let db = tracker.torrents.get_torrents().await; + + let mut basic_infos: Vec = vec![]; + + for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) { + let (seeders, completed, leechers) = torrent_entry.get_stats(); + + basic_infos.push(BasicInfo { + info_hash: *info_hash, + seeders: u64::from(seeders), + completed: u64::from(completed), + leechers: u64::from(leechers), + }); + } + + basic_infos } #[cfg(test)] diff --git a/src/core/torrent.rs b/src/core/torrent/mod.rs similarity index 99% rename from src/core/torrent.rs rename to src/core/torrent/mod.rs index 8167aa2d..a49e218a 100644 --- a/src/core/torrent.rs +++ b/src/core/torrent/mod.rs @@ -28,6 +28,8 @@ //! Peer that don not have a full copy of the torrent data are called "leechers". //! //! > **NOTICE**: that both [`SwarmMetadata`] and [`SwarmStats`] contain the same information. [`SwarmMetadata`] is using the names used on [BEP 48: Tracker Protocol Extension: Scrape](https://www.bittorrent.org/beps/bep_0048.html). +pub mod repository; + use std::time::Duration; use aquatic_udp_protocol::AnnounceEvent; diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repository.rs new file mode 100644 index 00000000..62df9b51 --- /dev/null +++ b/src/core/torrent/repository.rs @@ -0,0 +1,301 @@ +use std::sync::Arc; + +use crate::core::peer; +use crate::core::torrent::{Entry, SwarmStats}; +use crate::shared::bit_torrent::info_hash::InfoHash; + +pub trait Repository { + fn new() -> Self; + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); +} + +pub trait TRepositoryAsync { + fn new() -> Self; + fn update_torrent_with_peer_and_get_stats( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + ) -> impl std::future::Future + Send; +} + +/// Structure that holds all torrents. Using `std::sync` locks. +pub struct Sync { + torrents: std::sync::RwLock>>>, +} + +impl Sync { + /// Returns the get torrents of this [`Sync`]. + /// + /// # Panics + /// + /// Panics if unable to read the torrent. + pub fn get_torrents( + &self, + ) -> std::sync::RwLockReadGuard<'_, std::collections::BTreeMap>>> { + self.torrents.read().expect("unable to get torrent list") + } + + /// Returns the mutable get torrents of this [`Sync`]. + /// + /// # Panics + /// + /// Panics if unable to write to the torrents list. + pub fn get_torrents_mut( + &self, + ) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap>>> { + self.torrents.write().expect("unable to get writable torrent list") + } +} + +impl Repository for Sync { + fn new() -> Self { + Self { + torrents: std::sync::RwLock::new(std::collections::BTreeMap::new()), + } + } + + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let maybe_existing_torrent_entry = self.get_torrents().get(info_hash).cloned(); + + let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { + existing_torrent_entry + } else { + let mut torrents_lock = self.get_torrents_mut(); + let entry = torrents_lock + .entry(*info_hash) + .or_insert(Arc::new(std::sync::Mutex::new(Entry::new()))); + entry.clone() + }; + + let (stats, stats_updated) = { + let mut torrent_entry_lock = torrent_entry.lock().unwrap(); + let stats_updated = torrent_entry_lock.update_peer(peer); + let stats = torrent_entry_lock.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} + +/// Structure that holds all torrents. Using `std::sync` locks. +pub struct SyncSingle { + torrents: std::sync::RwLock>, +} + +impl SyncSingle { + /// Returns the get torrents of this [`SyncSingle`]. + /// + /// # Panics + /// + /// Panics if unable to get torrent list. + pub fn get_torrents(&self) -> std::sync::RwLockReadGuard<'_, std::collections::BTreeMap> { + self.torrents.read().expect("unable to get torrent list") + } + + /// Returns the get torrents of this [`SyncSingle`]. + /// + /// # Panics + /// + /// Panics if unable to get writable torrent list. + pub fn get_torrents_mut(&self) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { + self.torrents.write().expect("unable to get writable torrent list") + } +} + +impl Repository for SyncSingle { + fn new() -> Self { + Self { + torrents: std::sync::RwLock::new(std::collections::BTreeMap::new()), + } + } + + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let mut torrents = self.torrents.write().unwrap(); + + let torrent_entry = match torrents.entry(*info_hash) { + std::collections::btree_map::Entry::Vacant(vacant) => vacant.insert(Entry::new()), + std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(), + }; + + let stats_updated = torrent_entry.update_peer(peer); + let stats = torrent_entry.get_stats(); + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} + +/// Structure that holds all torrents. Using `tokio::sync` locks. +#[allow(clippy::module_name_repetitions)] +pub struct RepositoryAsync { + torrents: tokio::sync::RwLock>>>, +} + +impl TRepositoryAsync for RepositoryAsync { + fn new() -> Self { + Self { + torrents: tokio::sync::RwLock::new(std::collections::BTreeMap::new()), + } + } + + async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); + + let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { + existing_torrent_entry + } else { + let mut torrents_lock = self.get_torrents_mut().await; + let entry = torrents_lock + .entry(*info_hash) + .or_insert(Arc::new(tokio::sync::Mutex::new(Entry::new()))); + entry.clone() + }; + + let (stats, stats_updated) = { + let mut torrent_entry_lock = torrent_entry.lock().await; + let stats_updated = torrent_entry_lock.update_peer(peer); + let stats = torrent_entry_lock.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} + +impl RepositoryAsync { + pub async fn get_torrents( + &self, + ) -> tokio::sync::RwLockReadGuard<'_, std::collections::BTreeMap>>> { + self.torrents.read().await + } + + pub async fn get_torrents_mut( + &self, + ) -> tokio::sync::RwLockWriteGuard<'_, std::collections::BTreeMap>>> { + self.torrents.write().await + } +} + +/// Structure that holds all torrents. Using a `tokio::sync` lock for the torrents map an`std::sync`nc lock for the inner torrent entry. +pub struct AsyncSync { + torrents: tokio::sync::RwLock>>>, +} + +impl TRepositoryAsync for AsyncSync { + fn new() -> Self { + Self { + torrents: tokio::sync::RwLock::new(std::collections::BTreeMap::new()), + } + } + + async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); + + let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { + existing_torrent_entry + } else { + let mut torrents_lock = self.get_torrents_mut().await; + let entry = torrents_lock + .entry(*info_hash) + .or_insert(Arc::new(std::sync::Mutex::new(Entry::new()))); + entry.clone() + }; + + let (stats, stats_updated) = { + let mut torrent_entry_lock = torrent_entry.lock().unwrap(); + let stats_updated = torrent_entry_lock.update_peer(peer); + let stats = torrent_entry_lock.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} + +impl AsyncSync { + pub async fn get_torrents( + &self, + ) -> tokio::sync::RwLockReadGuard<'_, std::collections::BTreeMap>>> { + self.torrents.read().await + } + + pub async fn get_torrents_mut( + &self, + ) -> tokio::sync::RwLockWriteGuard<'_, std::collections::BTreeMap>>> { + self.torrents.write().await + } +} + +#[allow(clippy::module_name_repetitions)] +pub struct RepositoryAsyncSingle { + torrents: tokio::sync::RwLock>, +} + +impl TRepositoryAsync for RepositoryAsyncSingle { + fn new() -> Self { + Self { + torrents: tokio::sync::RwLock::new(std::collections::BTreeMap::new()), + } + } + + async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let (stats, stats_updated) = { + let mut torrents_lock = self.torrents.write().await; + let torrent_entry = torrents_lock.entry(*info_hash).or_insert(Entry::new()); + let stats_updated = torrent_entry.update_peer(peer); + let stats = torrent_entry.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} + +impl RepositoryAsyncSingle { + pub async fn get_torrents(&self) -> tokio::sync::RwLockReadGuard<'_, std::collections::BTreeMap> { + self.torrents.read().await + } + + pub async fn get_torrents_mut(&self) -> tokio::sync::RwLockWriteGuard<'_, std::collections::BTreeMap> { + self.torrents.write().await + } +} diff --git a/tests/wrk_benchmark_announce.lua b/tests/wrk_benchmark_announce.lua index c182f8e6..620ba268 100644 --- a/tests/wrk_benchmark_announce.lua +++ b/tests/wrk_benchmark_announce.lua @@ -1,53 +1,35 @@ --- else the randomness would be the same every run -math.randomseed(os.time()) +function generate_unique_info_hashes(size) + local result = {} + local seen = {} -local charset = "0123456789ABCDEF" + for i = 0, size - 1 do + local bytes = {} + bytes[1] = i & 0xFF + bytes[2] = (i >> 8) & 0xFF + bytes[3] = (i >> 16) & 0xFF + bytes[4] = (i >> 24) & 0xFF -function hexToChar(hex) - local n = tonumber(hex, 16) - local f = string.char(n) - return f -end + local info_hash = bytes + local key = table.concat(info_hash, ",") -function hexStringToCharString(hex) - local ret = {} - local r - for i = 0, 19 do - local x = i * 2 - r = hex:sub(x+1, x+2) - local f = hexToChar(r) - table.insert(ret, f) + if not seen[key] then + table.insert(result, info_hash) + seen[key] = true + end end - return table.concat(ret) -end -function urlEncode(str) - str = string.gsub (str, "([^0-9a-zA-Z !'()*._~-])", -- locale independent - function (c) return string.format ("%%%02X", string.byte(c)) end) - str = string.gsub (str, " ", "+") - return str + return result end -function genHexString(length) - local ret = {} - local r - for i = 1, length do - r = math.random(1, #charset) - table.insert(ret, charset:sub(r, r)) - end - return table.concat(ret) -end +info_hashes = generate_unique_info_hashes(10000000) -function randomInfoHash() - local hexString = genHexString(40) - local str = hexStringToCharString(hexString) - return urlEncode(str) -end +index = 0 -- the request function that will run at each request request = function() - path = "/announce?info_hash=" .. randomInfoHash() .. "&peer_id=-lt0D80-a%D4%10%19%99%A6yh%9A%E1%CD%96&port=54434&uploaded=885&downloaded=0&left=0&corrupt=0&key=A78381BD&numwant=200&compact=1&no_peer_id=1&supportcrypto=1&redundant=0" - headers = {} - headers["X-Forwarded-For"] = "1.1.1.1" - return wrk.format("GET", path, headers) + path = "/announce?info_hash=" .. info_hashes[index] .. "&peer_id=-lt0D80-a%D4%10%19%99%A6yh%9A%E1%CD%96&port=54434&uploaded=885&downloaded=0&left=0&corrupt=0&key=A78381BD&numwant=200&compact=1&no_peer_id=1&supportcrypto=1&redundant=0" + index += 1 + headers = {} + headers["X-Forwarded-For"] = "1.1.1.1" + return wrk.format("GET", path, headers) end