From d9cfb389010f5a1afd1cc15c6e5be7ccd142e2c8 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 7 Jan 2025 16:35:30 +0000 Subject: [PATCH] feat: make ban service generic for all trackers All UDP tracker will share the same service. In the future, the HTTP trackers can also use it. The service was not include inside the tracker (easy solution) becuase the Tracker type is too big. It has became the app container. In fact, we want to reduce it in the future by extracting the services outside of the tracker: stats, whitelist, etc. Those services will be instantiate independently in the future in the app bootstrap. --- src/app.rs | 14 +++++++++++--- src/bootstrap/app.rs | 9 +++++++-- src/bootstrap/jobs/udp_tracker.rs | 13 ++++++++++--- src/console/profiling.rs | 4 ++-- src/main.rs | 4 ++-- src/servers/udp/server/banning.rs | 11 +++-------- src/servers/udp/server/launcher.rs | 21 +++++++++++---------- src/servers/udp/server/mod.rs | 13 +++++++++++-- src/servers/udp/server/spawner.rs | 6 ++++-- src/servers/udp/server/states.rs | 11 +++++++++-- tests/servers/udp/environment.rs | 10 +++++++++- 11 files changed, 79 insertions(+), 37 deletions(-) diff --git a/src/app.rs b/src/app.rs index 06fea4d2e..f40072132 100644 --- a/src/app.rs +++ b/src/app.rs @@ -23,12 +23,14 @@ //! - Tracker REST API: the tracker API can be enabled/disabled. use std::sync::Arc; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_configuration::Configuration; use tracing::instrument; use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; use crate::servers::registar::Registar; +use crate::servers::udp::server::banning::BanService; use crate::{core, servers}; /// # Panics @@ -37,8 +39,12 @@ use crate::{core, servers}; /// /// - Can't retrieve tracker keys from database. /// - Can't load whitelist from database. -#[instrument(skip(config, tracker))] -pub async fn start(config: &Configuration, tracker: Arc) -> Vec> { +#[instrument(skip(config, tracker, ban_service))] +pub async fn start( + config: &Configuration, + tracker: Arc, + ban_service: Arc>, +) -> Vec> { if config.http_api.is_none() && (config.udp_trackers.is_none() || config.udp_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) && (config.http_trackers.is_none() || config.http_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) @@ -75,7 +81,9 @@ pub async fn start(config: &Configuration, tracker: Arc) -> Vec (Configuration, Arc) { +pub fn setup() -> (Configuration, Arc, Arc>) { #[cfg(not(test))] check_seed(); @@ -44,9 +47,11 @@ pub fn setup() -> (Configuration, Arc) { let tracker = initialize_with_configuration(&configuration); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + tracing::info!("Configuration:\n{}", configuration.clone().mask_secrets().to_json()); - (configuration, tracker) + (configuration, tracker, ban_service) } /// checks if the seed is the instance seed in production. diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 6aab06d4f..8948811af 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -8,12 +8,14 @@ //! > for the configuration options. use std::sync::Arc; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_configuration::UdpTracker; use tracing::instrument; use crate::core; use crate::servers::registar::ServiceRegistrationForm; +use crate::servers::udp::server::banning::BanService; use crate::servers::udp::server::spawner::Spawner; use crate::servers::udp::server::Server; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; @@ -29,13 +31,18 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It will panic if the task did not finish successfully. #[must_use] #[allow(clippy::async_yields_async)] -#[instrument(skip(config, tracker, form))] -pub async fn start_job(config: &UdpTracker, tracker: Arc, form: ServiceRegistrationForm) -> JoinHandle<()> { +#[instrument(skip(config, tracker, ban_service, form))] +pub async fn start_job( + config: &UdpTracker, + tracker: Arc, + ban_service: Arc>, + form: ServiceRegistrationForm, +) -> JoinHandle<()> { let bind_to = config.bind_address; let cookie_lifetime = config.cookie_lifetime; let server = Server::new(Spawner::new(bind_to)) - .start(tracker, form, cookie_lifetime) + .start(tracker, ban_service, form, cookie_lifetime) .await .expect("it should be able to start the udp tracker"); diff --git a/src/console/profiling.rs b/src/console/profiling.rs index 5fb507197..1d31af3ce 100644 --- a/src/console/profiling.rs +++ b/src/console/profiling.rs @@ -179,9 +179,9 @@ pub async fn run() { return; }; - let (config, tracker) = bootstrap::app::setup(); + let (config, tracker, ban_service) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker).await; + let jobs = app::start(&config, tracker, ban_service).await; // Run the tracker for a fixed duration let run_duration = sleep(Duration::from_secs(duration_secs)); diff --git a/src/main.rs b/src/main.rs index 0e2bcfbc9..206633f8c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,9 @@ use torrust_tracker_lib::{app, bootstrap}; #[tokio::main] async fn main() { - let (config, tracker) = bootstrap::app::setup(); + let (config, tracker, ban_service) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker).await; + let jobs = app::start(&config, tracker, ban_service).await; // handle the signals tokio::select! { diff --git a/src/servers/udp/server/banning.rs b/src/servers/udp/server/banning.rs index df236820c..dada592be 100644 --- a/src/servers/udp/server/banning.rs +++ b/src/servers/udp/server/banning.rs @@ -20,7 +20,6 @@ use std::net::IpAddr; use bloom::{CountingBloomFilter, ASMS}; use tokio::time::Instant; -use url::Url; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; @@ -28,16 +27,14 @@ pub struct BanService { max_connection_id_errors_per_ip: u32, fuzzy_error_counter: CountingBloomFilter, accurate_error_counter: HashMap, - local_addr: Url, last_connection_id_errors_reset: Instant, } impl BanService { #[must_use] - pub fn new(max_connection_id_errors_per_ip: u32, local_addr: Url) -> Self { + pub fn new(max_connection_id_errors_per_ip: u32) -> Self { Self { max_connection_id_errors_per_ip, - local_addr, fuzzy_error_counter: CountingBloomFilter::with_rate(4, 0.01, 100), accurate_error_counter: HashMap::new(), last_connection_id_errors_reset: tokio::time::Instant::now(), @@ -82,8 +79,7 @@ impl BanService { self.last_connection_id_errors_reset = Instant::now(); - let local_addr = self.local_addr.to_string(); - tracing::info!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (connection id errors filter cleared)"); + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp::run_udp_server::loop (connection id errors filter cleared)"); } } @@ -95,8 +91,7 @@ mod tests { /// Sample service with one day ban duration. fn ban_service(counter_limit: u32) -> BanService { - let udp_tracker_url = "udp://127.0.0.1".parse().unwrap(); - BanService::new(counter_limit, udp_tracker_url) + BanService::new(counter_limit) } #[test] diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index 15c7ca017..753dc9915 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -24,7 +24,7 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// The maximum number of connection id errors per ip. Clients will be banned if /// they exceed this limit. -const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10; +pub const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10; const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600; /// A UDP server instance launcher. @@ -40,9 +40,10 @@ impl Launcher { /// It panics if unable to send address of socket. /// It panics if the udp server is loaded when the tracker is private. /// - #[instrument(skip(tracker, bind_to, tx_start, rx_halt))] + #[instrument(skip(tracker, ban_service, bind_to, tx_start, rx_halt))] pub async fn run_with_graceful_shutdown( tracker: Arc, + ban_service: Arc>, bind_to: SocketAddr, cookie_lifetime: Duration, tx_start: oneshot::Sender, @@ -80,7 +81,7 @@ impl Launcher { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver, tracker.clone(), cookie_lifetime).await; + let () = Self::run_udp_server_main(receiver, tracker.clone(), ban_service.clone(), cookie_lifetime).await; }) }; @@ -117,8 +118,13 @@ impl Launcher { ServiceHealthCheckJob::new(binding, info, job) } - #[instrument(skip(receiver, tracker))] - async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc, cookie_lifetime: Duration) { + #[instrument(skip(receiver, tracker, ban_service))] + async fn run_udp_server_main( + mut receiver: Receiver, + tracker: Arc, + ban_service: Arc>, + cookie_lifetime: Duration, + ) { let active_requests = &mut ActiveRequests::default(); let addr = receiver.bound_socket_address(); @@ -127,11 +133,6 @@ impl Launcher { let cookie_lifetime = cookie_lifetime.as_secs_f64(); - let ban_service = Arc::new(RwLock::new(BanService::new( - MAX_CONNECTION_ID_ERRORS_PER_IP, - local_addr.parse().unwrap(), - ))); - let ban_cleaner = ban_service.clone(); tokio::spawn(async move { diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 9f974ca8c..6eb98a7b1 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -58,17 +58,23 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use tokio::sync::RwLock; use torrust_tracker_test_helpers::configuration::ephemeral_public; use super::spawner::Spawner; use super::Server; use crate::bootstrap::app::initialize_with_configuration; use crate::servers::registar::Registar; + use crate::servers::udp::server::banning::BanService; + use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { let cfg = Arc::new(ephemeral_public()); + let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration"); let config = &udp_trackers[0]; let bind_to = config.bind_address; @@ -77,7 +83,7 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, register.give_form(), config.cookie_lifetime) + .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) .await .expect("it should start the server"); @@ -91,7 +97,10 @@ mod tests { #[tokio::test] async fn it_should_be_able_to_start_and_stop_with_wait() { let cfg = Arc::new(ephemeral_public()); + let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap(); let bind_to = config.bind_address; let register = &Registar::default(); @@ -99,7 +108,7 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, register.give_form(), config.cookie_lifetime) + .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) .await .expect("it should start the server"); diff --git a/src/servers/udp/server/spawner.rs b/src/servers/udp/server/spawner.rs index acebdcf75..ce2fe8eae 100644 --- a/src/servers/udp/server/spawner.rs +++ b/src/servers/udp/server/spawner.rs @@ -5,9 +5,10 @@ use std::time::Duration; use derive_more::derive::Display; use derive_more::Constructor; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use tokio::task::JoinHandle; +use super::banning::BanService; use super::launcher::Launcher; use crate::bootstrap::jobs::Started; use crate::core::Tracker; @@ -28,6 +29,7 @@ impl Spawner { pub fn spawn_launcher( &self, tracker: Arc, + ban_service: Arc>, cookie_lifetime: Duration, tx_start: oneshot::Sender, rx_halt: oneshot::Receiver, @@ -35,7 +37,7 @@ impl Spawner { let spawner = Self::new(self.bind_to); tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(tracker, spawner.bind_to, cookie_lifetime, tx_start, rx_halt).await; + Launcher::run_with_graceful_shutdown(tracker, ban_service, spawner.bind_to, cookie_lifetime, tx_start, rx_halt).await; spawner }) } diff --git a/src/servers/udp/server/states.rs b/src/servers/udp/server/states.rs index 8b87c6efb..39576f864 100644 --- a/src/servers/udp/server/states.rs +++ b/src/servers/udp/server/states.rs @@ -5,9 +5,11 @@ use std::time::Duration; use derive_more::derive::Display; use derive_more::Constructor; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{instrument, Level}; +use super::banning::BanService; use super::spawner::Spawner; use super::{Server, UdpError}; use crate::bootstrap::jobs::Started; @@ -62,10 +64,12 @@ impl Server { /// /// It panics if unable to receive the bound socket address from service. /// - #[instrument(skip(self, tracker, form), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, tracker, ban_service, form), err, ret(Display, level = Level::INFO))] pub async fn start( self, tracker: Arc, + + ban_service: Arc>, form: ServiceRegistrationForm, cookie_lifetime: Duration, ) -> Result, std::io::Error> { @@ -75,7 +79,10 @@ impl Server { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); // May need to wrap in a task to about a tokio bug. - let task = self.state.spawner.spawn_launcher(tracker, cookie_lifetime, tx_start, rx_halt); + let task = self + .state + .spawner + .spawn_launcher(tracker, ban_service, cookie_lifetime, tx_start, rx_halt); let local_addr = rx_start.await.expect("it should be able to start the service").address; diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 01639accc..f744809c5 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -2,10 +2,13 @@ use std::net::SocketAddr; use std::sync::Arc; use bittorrent_primitives::info_hash::InfoHash; +use tokio::sync::RwLock; use torrust_tracker_configuration::{Configuration, UdpTracker, DEFAULT_TIMEOUT}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::registar::Registar; +use torrust_tracker_lib::servers::udp::server::banning::BanService; +use torrust_tracker_lib::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; use torrust_tracker_lib::servers::udp::server::spawner::Spawner; use torrust_tracker_lib::servers::udp::server::states::{Running, Stopped}; use torrust_tracker_lib::servers::udp::server::Server; @@ -17,6 +20,7 @@ where { pub config: Arc, pub tracker: Arc, + pub ban_service: Arc>, pub registar: Registar, pub server: Server, } @@ -36,6 +40,7 @@ impl Environment { #[allow(dead_code)] pub fn new(configuration: &Arc) -> Self { let tracker = initialize_with_configuration(configuration); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); let udp_tracker = configuration.udp_trackers.clone().expect("missing UDP tracker configuration"); @@ -48,6 +53,7 @@ impl Environment { Self { config, tracker, + ban_service, registar: Registar::default(), server, } @@ -59,10 +65,11 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + ban_service: self.ban_service.clone(), registar: self.registar.clone(), server: self .server - .start(self.tracker, self.registar.give_form(), cookie_lifetime) + .start(self.tracker, self.ban_service, self.registar.give_form(), cookie_lifetime) .await .unwrap(), } @@ -85,6 +92,7 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + ban_service: self.ban_service, registar: Registar::default(), server: stopped.expect("it stop the udp tracker service"), }