From 16cbea819e6099e4767b9cc01261a31155f28eb3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 12 Mar 2024 19:58:39 +0000 Subject: [PATCH] feat: [#469] import torrent statistics in batches Instead of importing all torrents statistics from the tracker every hour, this change imports 50 torrents every 100 milliseconds. Batches only include torrents that have not been updated in the last hour (or whatever is the `torrent_info_update_interval` value in the configuration). This change avoid loading the whole set of torrents in memory every time the importation starts. In the future, It could also allow to handle the nunber of request per second to the tracker (statically, from config value or dinamically, depending on the tracler load). Althought it saves memory, it runs more SQL queries to get the list of torrents pending to update. --- .../cronjobs/tracker_statistics_importer.rs | 48 +++++++++++---- src/databases/database.rs | 9 ++- src/databases/mysql.rs | 25 +++++++- src/databases/sqlite.rs | 25 +++++++- src/tracker/statistics_importer.rs | 58 ++++++++++++++++++- src/utils/clock.rs | 12 +++- 6 files changed, 156 insertions(+), 21 deletions(-) diff --git a/src/console/cronjobs/tracker_statistics_importer.rs b/src/console/cronjobs/tracker_statistics_importer.rs index 0d32ba34..52ab20bd 100644 --- a/src/console/cronjobs/tracker_statistics_importer.rs +++ b/src/console/cronjobs/tracker_statistics_importer.rs @@ -17,12 +17,14 @@ use axum::extract::State; use axum::routing::{get, post}; use axum::{Json, Router}; use chrono::{DateTime, Utc}; -use log::{error, info}; +use log::{debug, error, info}; use serde_json::{json, Value}; +use text_colorizer::Colorize; use tokio::net::TcpListener; use tokio::task::JoinHandle; use crate::tracker::statistics_importer::StatisticsImporter; +use crate::utils::clock::seconds_ago_utc; const IMPORTER_API_IP: &str = "127.0.0.1"; @@ -41,7 +43,7 @@ struct ImporterState { #[must_use] pub fn start( importer_port: u16, - torrent_info_update_interval: u64, + torrent_stats_update_interval: u64, tracker_statistics_importer: &Arc, ) -> JoinHandle<()> { let weak_tracker_statistics_importer = Arc::downgrade(tracker_statistics_importer); @@ -54,7 +56,7 @@ pub fn start( let _importer_api_handle = tokio::spawn(async move { let import_state = Arc::new(ImporterState { last_heartbeat: Arc::new(Mutex::new(Utc::now())), - torrent_info_update_interval, + torrent_info_update_interval: torrent_stats_update_interval, }); let app = Router::new() @@ -81,25 +83,47 @@ pub fn start( info!("Tracker statistics importer cronjob starting ..."); - let interval = std::time::Duration::from_secs(torrent_info_update_interval); - let mut interval = tokio::time::interval(interval); + let execution_interval_in_milliseconds = 100; + let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds); + let mut execution_interval = tokio::time::interval(execution_interval_duration); - interval.tick().await; // first tick is immediate... + execution_interval.tick().await; // first tick is immediate... - loop { - interval.tick().await; - - info!("Running tracker statistics importer ..."); + info!("Running tracker statistics importer every {execution_interval_in_milliseconds} milliseconds ..."); + loop { if let Err(e) = send_heartbeat(importer_port).await { error!("Failed to send heartbeat from importer cronjob: {}", e); } - if let Some(tracker) = weak_tracker_statistics_importer.upgrade() { - drop(tracker.import_all_torrents_statistics().await); + if let Some(statistics_importer) = weak_tracker_statistics_importer.upgrade() { + let one_interval_ago = seconds_ago_utc( + torrent_stats_update_interval + .try_into() + .expect("update interval should be a positive integer"), + ); + let limit = 50; + + debug!( + "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", + one_interval_ago.to_string().yellow(), + limit.to_string().yellow() + ); + + match statistics_importer + .import_torrents_statistics_not_updated_since(one_interval_ago, limit) + .await + { + Ok(()) => {} + Err(e) => error!("Failed to import statistics: {:?}", e), + } + + drop(statistics_importer); } else { break; } + + execution_interval.tick().await; } }) } diff --git a/src/databases/database.rs b/src/databases/database.rs index 2d56e22f..a19970be 100644 --- a/src/databases/database.rs +++ b/src/databases/database.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::databases::mysql::Mysql; @@ -292,6 +292,13 @@ pub trait Database: Sync + Send { /// Get all torrents as `Vec`. async fn get_all_torrents_compact(&self) -> Result, Error>; + /// Get torrents whose stats have not been imported from the tracker at least since a given datetime. + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, Error>; + /// Update a torrent's title with `torrent_id` and `title`. async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), Error>; diff --git a/src/databases/mysql.rs b/src/databases/mysql.rs index 5ee723cd..d28f1ae7 100644 --- a/src/databases/mysql.rs +++ b/src/databases/mysql.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock::{self, datetime_now}; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Mysql { @@ -884,6 +884,27 @@ impl Database for Mysql { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?") .bind(title) diff --git a/src/databases/sqlite.rs b/src/databases/sqlite.rs index 26d90eea..421292d6 100644 --- a/src/databases/sqlite.rs +++ b/src/databases/sqlite.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{DateTime, NaiveDateTime, Utc}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool}; @@ -20,7 +20,7 @@ use crate::models::torrent_tag::{TagId, TorrentTag}; use crate::models::tracker_key::TrackerKey; use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile}; use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash}; -use crate::utils::clock::{self, datetime_now}; +use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT}; use crate::utils::hex::from_bytes; pub struct Sqlite { @@ -876,6 +876,27 @@ impl Database for Sqlite { .map_err(|_| database::Error::Error) } + async fn get_torrents_with_stats_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result, database::Error> { + query_as::<_, TorrentCompact>( + "SELECT tt.torrent_id, tt.info_hash + FROM torrust_torrents tt + LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id + WHERE tts.updated_at < ? OR tts.updated_at IS NULL + ORDER BY tts.updated_at ASC + LIMIT ? + ", + ) + .bind(datetime.format(DATETIME_FORMAT).to_string()) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|_| database::Error::Error) + } + async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> { query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2") .bind(title) diff --git a/src/tracker/statistics_importer.rs b/src/tracker/statistics_importer.rs index 996008f3..d9030e39 100644 --- a/src/tracker/statistics_importer.rs +++ b/src/tracker/statistics_importer.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use std::time::Instant; -use log::{error, info}; +use chrono::{DateTime, Utc}; +use log::{debug, error, info}; use text_colorizer::Colorize; use super::service::{Service, TorrentInfo, TrackerAPIError}; @@ -36,13 +37,17 @@ impl StatisticsImporter { pub async fn import_all_torrents_statistics(&self) -> Result<(), database::Error> { let torrents = self.database.get_all_torrents_compact().await?; + if torrents.is_empty() { + return Ok(()); + } + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); // Start the timer before the loop let start_time = Instant::now(); for torrent in torrents { - info!(target: LOG_TARGET, "Importing torrent #{} ...", torrent.torrent_id.to_string().yellow()); + info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; @@ -64,6 +69,55 @@ impl StatisticsImporter { Ok(()) } + /// Import torrents statistics not updated recently.. + /// + /// # Errors + /// + /// Will return an error if the database query failed. + pub async fn import_torrents_statistics_not_updated_since( + &self, + datetime: DateTime, + limit: i64, + ) -> Result<(), database::Error> { + debug!(target: LOG_TARGET, "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", datetime.to_string().yellow(), limit.to_string().yellow()); + + let torrents = self + .database + .get_torrents_with_stats_not_updated_since(datetime, limit) + .await?; + + if torrents.is_empty() { + return Ok(()); + } + + info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); + + // Start the timer before the loop + let start_time = Instant::now(); + + for torrent in torrents { + info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); + + let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; + + if let Some(err) = ret.err() { + if err != TrackerAPIError::TorrentNotFound { + let message = format!( + "Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}", + torrent.torrent_id, torrent.info_hash, err + ); + error!(target: LOG_TARGET, "{}", message); + } + } + } + + let elapsed_time = start_time.elapsed(); + + info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time); + + Ok(()) + } + /// Import torrent statistics from tracker and update them in database. /// /// # Errors diff --git a/src/utils/clock.rs b/src/utils/clock.rs index 338a5a61..42269eeb 100644 --- a/src/utils/clock.rs +++ b/src/utils/clock.rs @@ -1,4 +1,6 @@ -use chrono::Utc; +use chrono::{DateTime, Duration, Utc}; + +pub const DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; /// Returns the current timestamp in seconds. /// @@ -11,10 +13,16 @@ pub fn now() -> u64 { u64::try_from(chrono::prelude::Utc::now().timestamp()).expect("timestamp should be positive") } +/// Returns the datetime some seconds ago. +#[must_use] +pub fn seconds_ago_utc(seconds: i64) -> DateTime { + Utc::now() - Duration::seconds(seconds) +} + /// Returns the current time in database format. /// /// For example: `2024-03-12 15:56:24`. #[must_use] pub fn datetime_now() -> String { - Utc::now().format("%Y-%m-%d %H:%M:%S").to_string() + Utc::now().format(DATETIME_FORMAT).to_string() }