diff --git a/src/console/cronjobs/tracker_statistics_importer.rs b/src/console/cronjobs/tracker_statistics_importer.rs index 52ab20bd..970fd7ca 100644 --- a/src/console/cronjobs/tracker_statistics_importer.rs +++ b/src/console/cronjobs/tracker_statistics_importer.rs @@ -83,6 +83,15 @@ pub fn start( info!("Tracker statistics importer cronjob starting ..."); + // code-review: we set an execution interval to avoid intense polling to + // the database. If we remove the interval we would be constantly + // queering if there are torrent stats pending to update, unless there + // are torrents to update. Maybe we should only sleep for 100 milliseconds + // if we did not update any torrents in the latest execution. + // With this current limit we can only import 50 torrent stats every 100 + // milliseconds which is 500 torrents per second (1800000 torrents per hour). + // If the tracker can handle a request in 100 milliseconds. + let execution_interval_in_milliseconds = 100; let execution_interval_duration = std::time::Duration::from_millis(execution_interval_in_milliseconds); let mut execution_interval = tokio::time::interval(execution_interval_duration); diff --git a/src/tracker/api.rs b/src/tracker/api.rs index d3c00188..c81a745e 100644 --- a/src/tracker/api.rs +++ b/src/tracker/api.rs @@ -15,6 +15,8 @@ impl ConnectionInfo { } } +const TOKEN_PARAM_NAME: &str = "token"; + pub struct Client { pub connection_info: ConnectionInfo, api_base_url: String, @@ -29,7 +31,7 @@ impl Client { pub fn new(connection_info: ConnectionInfo) -> Result { let base_url = format!("{}/api/v1", connection_info.url); let client = reqwest::Client::builder().timeout(Duration::from_secs(5)).build()?; - let token_param = [("token".to_string(), connection_info.token.to_string())]; + let token_param = [(TOKEN_PARAM_NAME.to_string(), connection_info.token.to_string())]; Ok(Self { connection_info, @@ -72,7 +74,7 @@ impl Client { self.client.post(request_url).query(&self.token_param).send().await } - /// Retrieve the info for a torrent. + /// Retrieve the info for one torrent. /// /// # Errors /// @@ -82,4 +84,23 @@ impl Client { self.client.get(request_url).query(&self.token_param).send().await } + + /// Retrieve the info for multiple torrents at the same time. + /// + /// # Errors + /// + /// Will return an error if the HTTP request fails. + pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result { + let request_url = format!("{}/torrents", self.api_base_url); + + let mut query_params: Vec<(String, String)> = Vec::with_capacity(info_hashes.len() + 1); + + query_params.push((TOKEN_PARAM_NAME.to_string(), self.connection_info.token.clone())); + + for info_hash in info_hashes { + query_params.push(("info_hash".to_string(), info_hash.clone())); + } + + self.client.get(request_url).query(&query_params).send().await + } } diff --git a/src/tracker/service.rs b/src/tracker/service.rs index 598e35fd..3036ce89 100644 --- a/src/tracker/service.rs +++ b/src/tracker/service.rs @@ -48,6 +48,14 @@ pub struct TorrentInfo { pub peers: Vec, } +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub struct TorrentBasicInfo { + pub info_hash: String, + pub seeders: i64, + pub completed: i64, + pub leechers: i64, +} + #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct Peer { pub peer_id: Option, @@ -259,6 +267,54 @@ impl Service { } } + /// Get torrent info from tracker in batches. + /// + /// # Errors + /// + /// Will return an error if the HTTP request to get torrent info fails or + /// if the response cannot be parsed. + pub async fn get_torrents_info(&self, info_hashes: &[String]) -> Result, TrackerAPIError> { + debug!(target: "tracker-service", "get torrents info"); + + let maybe_response = self.api_client.get_torrents_info(info_hashes).await; + + debug!(target: "tracker-service", "get torrents info response result: {:?}", maybe_response); + + match maybe_response { + Ok(response) => { + let status: StatusCode = map_status_code(response.status()); + + let body = response.text().await.map_err(|_| { + error!(target: "tracker-service", "response without body"); + TrackerAPIError::MissingResponseBody + })?; + + match status { + StatusCode::OK => serde_json::from_str(&body).map_err(|e| { + error!( + target: "tracker-service", "Failed to parse torrents info from tracker response. Body: {}, Error: {}", + body, e + ); + TrackerAPIError::FailedToParseTrackerResponse { body } + }), + StatusCode::INTERNAL_SERVER_ERROR => { + if body == Self::invalid_token_body() { + Err(TrackerAPIError::InvalidToken) + } else { + error!(target: "tracker-service", "get torrents info 500 response: status {status}, body: {body}"); + Err(TrackerAPIError::InternalServerError) + } + } + _ => { + error!(target: "tracker-service", "get torrents info unhandled response: status {status}, body: {body}"); + Err(TrackerAPIError::UnexpectedResponseStatus) + } + } + } + Err(_) => Err(TrackerAPIError::TrackerOffline), + } + } + /// Issue a new tracker key from tracker. async fn retrieve_new_tracker_key(&self, user_id: i64) -> Result { debug!(target: "tracker-service", "retrieve key: {user_id}"); diff --git a/src/tracker/statistics_importer.rs b/src/tracker/statistics_importer.rs index d9030e39..b9842855 100644 --- a/src/tracker/statistics_importer.rs +++ b/src/tracker/statistics_importer.rs @@ -58,6 +58,7 @@ impl StatisticsImporter { torrent.torrent_id, torrent.info_hash, err ); error!(target: "statistics_importer", "{}", message); + // todo: return a service error that can be a tracker API error or a database error. } } } @@ -92,29 +93,48 @@ impl StatisticsImporter { info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.yellow()); - // Start the timer before the loop - let start_time = Instant::now(); + // Import stats for all torrents in one request - for torrent in torrents { - info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow()); + let info_hashes: Vec = torrents.iter().map(|t| t.info_hash.clone()).collect(); - let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await; + let torrent_info_vec = match self.tracker_service.get_torrents_info(&info_hashes).await { + Ok(torrents_info) => torrents_info, + Err(err) => { + let message = format!("Error getting torrents tracker stats. Error: {err:?}"); + error!(target: LOG_TARGET, "{}", message); + // todo: return a service error that can be a tracker API error or a database error. + return Ok(()); + } + }; - if let Some(err) = ret.err() { - if err != TrackerAPIError::TorrentNotFound { - let message = format!( - "Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}", - torrent.torrent_id, torrent.info_hash, err + // Update stats for all torrents + + for torrent in torrents { + match torrent_info_vec.iter().find(|t| t.info_hash == torrent.info_hash) { + None => { + // No stats for this torrent in the tracker + drop( + self.database + .update_tracker_info(torrent.torrent_id, &self.tracker_url, 0, 0) + .await, + ); + } + Some(torrent_info) => { + // Update torrent stats for this tracker + drop( + self.database + .update_tracker_info( + torrent.torrent_id, + &self.tracker_url, + torrent_info.seeders, + torrent_info.leechers, + ) + .await, ); - error!(target: LOG_TARGET, "{}", message); } } } - let elapsed_time = start_time.elapsed(); - - info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time); - Ok(()) }