diff --git a/src/console/cronjobs/tracker_statistics_importer.rs b/src/console/cronjobs/tracker_statistics_importer.rs index 0d32ba34..52ab20bd 100644 --- a/src/console/cronjobs/tracker_statistics_importer.rs +++ b/src/console/cronjobs/tracker_statistics_importer.rs @@ -17,12 +17,14 @@ use axum::extract::State; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::{DateTime, Utc}; -use log::{error, info}; +use log::{debug, error, info}; use serde_json::{json, Value}; +use text_colorizer::Colorize; use tokio::net::TcpListener; use tokio::task::JoinHandle; use crate::tracker::statistics_importer::StatisticsImporter; +use crate::utils::clock::seconds_ago_utc; const IMPORTER_API_IP: &str = "127.0.0.1"; @@ -41,7 +43,7 @@ struct ImporterState { #[must_use] pub fn start( importer_port: u16, - torrent_info_update_interval: u64, + torrent_stats_update_interval: u64, tracker_statistics_importer: &Arc, ) -> JoinHandle<()> { let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer); @@ -54,7 +56,7 @@ pub fn start( let _importer_api_handle = tokio::spawn(async move { let import_state = Arc::new(ImporterState { last_heartbeat: Arc::new(Mutex::new(Utc::now())), - torrent_info_update_interval, + torrent_info_update_interval: torrent_stats_update_interval, }); let app = Router::new() @@ -81,25 +83,47 @@ pub fn start( info!("Tracker statistics importer cronjob starting ..."); - let interval = std::time::Duration::from_secs(torrent_info_update_interval); - let mut interval = tokio::time::interval(interval); + let execution_interval_in_milliseconds = 100; + let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds); + let mut execution_interval = tokio::time::interval(execution_interval_duration); - interval.tick().await; // first tick is immediate... + execution_interval.tick().await; // first tick is immediate... - loop { - interval.tick().await; - - info!("Running tracker statistics importer ..."); + info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ..."); + loop { if let Err(e) = send_heartbeat(importer_port).await { error!("Failed to send heartbeat from importer cronjob: {}", e); } - if let Some(tracker) = weak_tracker_statistics_importer.upgrade() { - drop(tracker.import_all_torrents_statistics().await); + if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() { + let one_interval_ago = seconds_ago_utc( + torrent_stats_update_interval + .try_into() + .expect("update interval should be a positive integer"), + ); + let limit = 50; + + debug!( + "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", + one_interval_ago.to_string().yellow(), + limit.to_string().yellow() + ); + + match statistics_importer + .import_torrents_statistics_not_updated_since(one_interval_ago, limit) + .await + { + Ok(()) => {} + Err(e) => error!("Failed to import statistics: {:?}", e), + } + + drop(statistics_importer); } else { break; } + + execution_interval.tick().await; } }) } diff --git a/src/databases/database.rs b/src/databases/database.rs index 2d56e22f..a19970be 100644 --- a/src/databases/database.rs +++ b/src/databases/database.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::databases::mysql::Mysql; @@ -292,6 +292,13 @@ pub trait Database: Sync + Send { /// Get all torrents as `Vec`. async fn get_all_torrents_compact(&self) -> Result, Error>; + /// Get torrents whose stats have not been imported from the tracker at least since a given datetime. + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, Error>; + /// Update a torrent's title with `torrent_id` and `title`. async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>; diff --git a/src/databases/mysql.rs b/src/databases/mysql.rs index 5ee723cd..d28f1ae7 100644 --- a/src/databases/mysql.rs +++ b/src/databases/mysql.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock::{self, datetime_now}; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Mysql { @@ -884,6 +884,27 @@ impl Database for Mysql { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?") .bind(title) diff --git a/src/databases/sqlite.rs b/src/databases/sqlite.rs index 26d90eea..421292d6 100644 --- a/src/databases/sqlite.rs +++ b/src/databases/sqlite.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock::{self, datetime_now}; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Sqlite { @@ -876,6 +876,27 @@ impl Database for Sqlite { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2") .bind(title) diff --git a/src/tracker/statistics_importer.rs b/src/tracker/statistics_importer.rs index 996008f3..d9030e39 100644 --- a/src/tracker/statistics_importer.rs +++ b/src/tracker/statistics_importer.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use std::time::Instant; -use log::{error, info}; +use chrono::{DateTime, Utc}; +use log::{debug, error, info}; use text_colorizer::Colorize; use super::service::{Service, TorrentInfo, TrackerAPIError}; @@ -36,13 +37,17 @@ impl StatisticsImporter { pub async fn import_all_torrents_statistics(&self) -> Result<(), database::Error> { let torrents = self.database.get_all_torrents_compact().await?; + if torrents.is_empty() { + return Ok(()); + } + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); // Start the timer before the loop let start_time = Instant::now(); for torrent in torrents { - info!(target: LOG_TARGET, "Importing torrent #{} ...", torrent.torrent_id.to_string().yellow()); + info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; @@ -64,6 +69,55 @@ impl StatisticsImporter { Ok(()) } + /// Import torrents statistics not updated recently.. + /// + /// # Errors + /// + /// Will return an error if the database query failed. + pub async fn import_torrents_statistics_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result<(), database::Error> { + debug!(target: LOG_TARGET, "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", datetime.to_string().yellow(), limit.to_string().yellow()); + + let torrents = self + .database + .get_torrents_with_stats_not_updated_since(datetime, limit) + .await?; + + if torrents.is_empty() { + return Ok(()); + } + + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); + + // Start the timer before the loop + let start_time = Instant::now(); + + for torrent in torrents { + info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); + + let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; + + if let Some(err) = ret.err() { + if err != TrackerAPIError::TorrentNotFound { + let message = format!( + "Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}", + torrent.torrent_id, torrent.info_hash, err + ); + error!(target: LOG_TARGET, "{}", message); + } + } + } + + let elapsed_time = start_time.elapsed(); + + info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time); + + Ok(()) + } + /// Import torrent statistics from tracker and update them in database. /// /// # Errors diff --git a/src/utils/clock.rs b/src/utils/clock.rs index 338a5a61..42269eeb 100644 --- a/src/utils/clock.rs +++ b/src/utils/clock.rs @@ -1,4 +1,6 @@ -use chrono::Utc; +use chrono::{DateTime, Duration, Utc}; + +pub const DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; /// Returns the current timestamp in seconds. /// @@ -11,10 +13,16 @@ pub fn now() -> u64 { u64::try_from(chrono::prelude::Utc::now().timestamp()).expect("timestamp should be positive") } +/// Returns the datetime some seconds ago. +#[must_use] +pub fn seconds_ago_utc(seconds: i64) -> DateTime { + Utc::now() - Duration::seconds(seconds) +} + /// Returns the current time in database format. /// /// For example: `2024-03-12 15:56:24`. #[must_use] pub fn datetime_now() -> String { - Utc::now().format("%Y-%m-%d %H:%M:%S").to_string() + Utc::now().format(DATETIME_FORMAT).to_string() }