diff --git a/cSpell.json b/cSpell.json index 090a2b0e..a21e69b9 100644 --- a/cSpell.json +++ b/cSpell.json @@ -5,6 +5,7 @@ "alekitto", "appuser", "Arvid", + "ASMS", "asyn", "autoclean", "AUTOINCREMENT", diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index c8bac809..6ae41451 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -3,10 +3,11 @@ use std::sync::Arc; use std::time::Duration; use bittorrent_tracker_client::udp::client::check; +use bloom::CountingBloomFilter; use derive_more::Constructor; use futures_util::StreamExt; use tokio::select; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use tracing::instrument; use super::request_buffer::ActiveRequests; @@ -20,6 +21,10 @@ use crate::servers::udp::server::processor::Processor; use crate::servers::udp::server::receiver::Receiver; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; +/// The maximum number of connection id errors per ip. Clients will be banned if +/// they exceed this limit. +const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10; + /// A UDP server instance launcher. #[derive(Constructor)] pub struct Launcher; @@ -114,6 +119,10 @@ impl Launcher { async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc, cookie_lifetime: Duration) { let active_requests = &mut ActiveRequests::default(); + // Create a counting bloom filter that uses 4 bits per element and has a + // false positive rate of 0.01 when 100 items have been inserted + let cbf = Arc::new(RwLock::new(CountingBloomFilter::with_rate(4, 0.01, 100))); + let addr = receiver.bound_socket_address(); let local_addr = format!("udp://{addr}"); @@ -140,6 +149,13 @@ impl Launcher { } }; + let connection_id_errors_from_ip = cbf.read().await.estimate_count(&req.from.ip().to_string()); + + if connection_id_errors_from_ip > MAX_CONNECTION_ID_ERRORS_PER_IP { + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)"); + continue; + } + // We spawn the new task even if there active requests buffer is // full. This could seem counterintuitive because we are accepting // more request and consuming more memory even if the server is @@ -151,7 +167,8 @@ impl Launcher { // are only adding and removing tasks without given them the // chance to finish. However, the buffer is yielding before // aborting one tasks, giving it the chance to finish. - let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle(); + let abort_handle: tokio::task::AbortHandle = + tokio::task::spawn(processor.process_request(req, cbf.clone())).abort_handle(); if abort_handle.is_finished() { continue; diff --git a/src/servers/udp/server/processor.rs b/src/servers/udp/server/processor.rs index 703367f3..bd83ea24 100644 --- a/src/servers/udp/server/processor.rs +++ b/src/servers/udp/server/processor.rs @@ -3,6 +3,8 @@ use std::net::SocketAddr; use std::sync::Arc; use aquatic_udp_protocol::Response; +use bloom::{CountingBloomFilter, ASMS}; +use tokio::sync::RwLock; use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; @@ -25,8 +27,8 @@ impl Processor { } } - #[instrument(skip(self, request))] - pub async fn process_request(self, request: RawRequest) { + #[instrument(skip(self, request, cbf))] + pub async fn process_request(self, request: RawRequest, cbf: Arc>) { let from = request.from; let response = handlers::handle_packet( request, @@ -35,6 +37,13 @@ impl Processor { CookieTimeValues::new(self.cookie_lifetime), ) .await; + + if let Response::Error(err) = &response { + if err.message.contains("cookie value is expired") || err.message.contains("cookie value is from future") { + cbf.write().await.insert(&from.ip().to_string()); + } + } + self.send_response(from, response).await; }