Skip to content

Commit

Permalink
test: add test for udp::handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Sep 22, 2022
1 parent a049e29 commit 7abe0f5
Show file tree
Hide file tree
Showing 5 changed files with 1,119 additions and 27 deletions.
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use log::info;
use torrust_tracker::tracker::statistics::StatsTracker;
use torrust_tracker::tracker::tracker::TorrentTracker;
use torrust_tracker::{logging, setup, static_time, Configuration};

Expand All @@ -19,8 +20,11 @@ async fn main() {
}
};

// Initialize stats tracker
let stats_tracker = StatsTracker::new_running_instance();

// Initialize Torrust tracker
let tracker = match TorrentTracker::new(config.clone()) {
let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker)) {
Ok(tracker) => Arc::new(tracker),
Err(error) => {
panic!("{}", error)
Expand Down
54 changes: 40 additions & 14 deletions src/tracker/statistics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use async_trait::async_trait;
use std::sync::Arc;

use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, RwLock, RwLockReadGuard};

const CHANNEL_BUFFER_SIZE: usize = 65_535;

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum TrackerStatisticsEvent {
Tcp4Announce,
Tcp4Scrape,
Expand Down Expand Up @@ -61,25 +61,19 @@ pub struct StatsTracker {
}

impl StatsTracker {
pub fn new_running_instance() -> Self {
let mut stats_tracker = Self::new();
stats_tracker.run_worker();
stats_tracker
}

pub fn new() -> Self {
Self {
channel_sender: None,
stats: Arc::new(RwLock::new(TrackerStatistics::new())),
}
}

pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> {
self.stats.read().await
}

pub async fn send_event(&self, event: TrackerStatisticsEvent) -> Option<Result<(), SendError<TrackerStatisticsEvent>>> {
if let Some(tx) = &self.channel_sender {
Some(tx.send(event).await)
} else {
None
}
}

pub fn run_worker(&mut self) {
let (tx, mut rx) = mpsc::channel::<TrackerStatisticsEvent>(CHANNEL_BUFFER_SIZE);

Expand Down Expand Up @@ -134,3 +128,35 @@ impl StatsTracker {
});
}
}

#[async_trait]
pub trait TrackerStatisticsEventSender: Sync + Send {
async fn send_event(&self, event: TrackerStatisticsEvent) -> Option<Result<(), SendError<TrackerStatisticsEvent>>>;
}

#[async_trait]
impl TrackerStatisticsEventSender for StatsTracker {
async fn send_event(&self, event: TrackerStatisticsEvent) -> Option<Result<(), SendError<TrackerStatisticsEvent>>> {
if let Some(tx) = &self.channel_sender {
Some(tx.send(event).await)
} else {
None
}
}
}

#[async_trait]
pub trait TrackerStatisticsRepository: Sync + Send {
async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics>;
}

#[async_trait]
impl TrackerStatisticsRepository for StatsTracker {
async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> {
self.stats.read().await
}
}

pub trait TrackerStatsService: TrackerStatisticsEventSender + TrackerStatisticsRepository {}

impl TrackerStatsService for StatsTracker {}
2 changes: 1 addition & 1 deletion src/tracker/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::peer::TorrentPeer;
use crate::protocol::clock::{DefaultClock, TimeNow};
use crate::{PeerId, MAX_SCRAPE_TORRENTS};

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TorrentEntry {
#[serde(skip)]
pub peers: std::collections::BTreeMap<PeerId, TorrentPeer>,
Expand Down
36 changes: 25 additions & 11 deletions src/tracker/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::databases::database::Database;
use crate::mode::TrackerMode;
use crate::peer::TorrentPeer;
use crate::protocol::common::InfoHash;
use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsEvent};
use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatsService};
use crate::tracker::key;
use crate::tracker::key::AuthKey;
use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats};
Expand All @@ -24,19 +24,13 @@ pub struct TorrentTracker {
keys: RwLock<std::collections::HashMap<String, AuthKey>>,
whitelist: RwLock<std::collections::HashSet<InfoHash>>,
torrents: RwLock<std::collections::BTreeMap<InfoHash, TorrentEntry>>,
stats_tracker: StatsTracker,
stats_tracker: Box<dyn TrackerStatsService>,
database: Box<dyn Database>,
}

impl TorrentTracker {
pub fn new(config: Arc<Configuration>) -> Result<TorrentTracker, r2d2::Error> {
pub fn new(config: Arc<Configuration>, stats_tracker: Box<dyn TrackerStatsService>) -> Result<TorrentTracker, r2d2::Error> {
let database = database::connect_database(&config.db_driver, &config.db_path)?;
let mut stats_tracker = StatsTracker::new();

// starts a thread for updating tracker stats
if config.tracker_usage_statistics {
stats_tracker.run_worker();
}

Ok(TorrentTracker {
config: config.clone(),
Expand Down Expand Up @@ -96,11 +90,20 @@ impl TorrentTracker {

// Adding torrents is not relevant to public trackers.
pub async fn add_torrent_to_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> {
self.database.add_info_hash_to_whitelist(info_hash.clone()).await?;
self.whitelist.write().await.insert(info_hash.clone());
self.add_torrent_to_database_whitelist(info_hash).await?;
self.add_torrent_to_memory_whitelist(info_hash).await;
Ok(())
}

async fn add_torrent_to_database_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> {
self.database.add_info_hash_to_whitelist(*info_hash).await?;
Ok(())
}

pub async fn add_torrent_to_memory_whitelist(&self, info_hash: &InfoHash) -> bool {
self.whitelist.write().await.insert(*info_hash)
}

// Removing torrents is not relevant to public trackers.
pub async fn remove_torrent_from_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> {
self.database.remove_info_hash_from_whitelist(info_hash.clone()).await?;
Expand Down Expand Up @@ -177,6 +180,7 @@ impl TorrentTracker {
Ok(())
}

/// Get all torrent peers for a given torrent filtering out the peer with the client address
pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec<TorrentPeer> {
let read_lock = self.torrents.read().await;

Expand All @@ -186,6 +190,16 @@ impl TorrentTracker {
}
}

/// Get all torrent peers for a given torrent
pub async fn get_all_torrent_peers(&self, info_hash: &InfoHash) -> Vec<TorrentPeer> {
let read_lock = self.torrents.read().await;

match read_lock.get(info_hash) {
None => vec![],
Some(entry) => entry.get_peers(None).into_iter().cloned().collect(),
}
}

pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> TorrentStats {
let mut torrents = self.torrents.write().await;

Expand Down
Loading

0 comments on commit 7abe0f5

Please sign in to comment.