diff --git a/Cargo.toml b/Cargo.toml index 6d2e59c..f652bc4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "busser" -version = "0.2.1" +version = "0.2.2" authors = ["Jerboa"] edition="2021" @@ -26,6 +26,7 @@ libflate = "2" quick-xml = "0.31.0" indicatif = "0.17.8" uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"]} +cron = "0.12.1" [profile.dev] opt-level = 0 diff --git a/src/config.rs b/src/config.rs index e2eef9f..8f1c81c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,22 +5,20 @@ use serde::{Serialize, Deserialize}; use crate::{filesystem::file::read_file_utf8, integrations::webhook::Webhook}; /// Configure the stats collection -/// - ```save_period_seconds```: periodically save to disc /// - ```path```: where to save to disc (time-stamped files) /// - ```hit_cooloff_seconds```: cooloff period after which the same IP is counted as a new hit /// - ```clear_period_seconds```: periodcially clear data in memory -/// - ```digest_period_seconds```: periodically send a digts to a Discord webhook -/// - ```log_files_clear_period_seconds```:archive and clear stats log files periodically +/// - ```save_schedule```: periodically save to disc, cron format: "sec min hour day-of-month month day-of-week year" +/// - ```digest_schedule```: periodically send a digts to a Discord webhook, cron format: "sec min hour day-of-month month day-of-week year" /// - ```ignore_regexes```: collect, but do not report, hits on these regexes /// - ```top_n_digest```: top n listing of pages and resources in API/discord default is 3 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatsConfig { - pub save_period_seconds: u64, pub path: String, pub hit_cooloff_seconds: u64, - pub digest_period_seconds: u64, - pub log_files_clear_period_seconds: u64, + pub save_schedule: Option, + pub digest_schedule: Option, pub ignore_regexes: Option>, pub top_n_digest: Option } @@ -31,11 +29,10 @@ impl StatsConfig { StatsConfig { - save_period_seconds: 86400, path: "stats".to_string(), hit_cooloff_seconds: 60, - digest_period_seconds: 86400, - log_files_clear_period_seconds: 2419200, + save_schedule: None, + digest_schedule: None, ignore_regexes: None, top_n_digest: None } diff --git a/src/filesystem/file.rs b/src/filesystem/file.rs index 6f943ff..635f619 100644 --- a/src/filesystem/file.rs +++ b/src/filesystem/file.rs @@ -32,8 +32,11 @@ pub trait Observed pub fn write_file_bytes(path: &str, data: &[u8]) { - let mut file = fs::File::create(path).unwrap(); - file.write_all(data).unwrap(); + match fs::File::create(path) + { + Ok(mut file) => file.write_all(data).unwrap(), + Err(e) => {crate::debug(format!("Error {} creating file {}", e, path), None)} + } } pub fn read_file_utf8(path: &str) -> Option diff --git a/src/server/https.rs b/src/server/https.rs index e091c0b..00dc087 100644 --- a/src/server/https.rs +++ b/src/server/https.rs @@ -1,9 +1,9 @@ use crate:: { - config::{read_config, Config, CONFIG_PATH}, content::{filter::ContentFilter, sitemap::SiteMap, Content}, server::throttle::{handle_throttle, IpThrottler}, task::TaskPool, CRAB + config::{read_config, Config, CONFIG_PATH}, content::{filter::ContentFilter, sitemap::SiteMap, Content}, server::throttle::{handle_throttle, IpThrottler}, task::{schedule_from_option, TaskPool}, CRAB }; -use std::{collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; @@ -15,7 +15,7 @@ use axum:: }; use axum_server::tls_rustls::RustlsConfig; -use super::{api::{stats::StatsDigest, ApiRequest}, stats::{digest::Digest, hits::{log_stats, HitStats}, StatsSaveTask, StatsDigestTask}}; +use super::{api::{stats::StatsDigest, ApiRequest}, stats::{hits::{log_stats, HitStats}, StatsSaveTask, StatsDigestTask}}; /// An https server that reads a directory configured with [Config] /// ```.html``` pages and resources, then serves them. @@ -138,18 +138,32 @@ impl Server { addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(a,b,c,d)), config.port_https), router, - config, + config: config.clone(), tasks: TaskPool::new() }; server.tasks.add ( - Box::new(StatsSaveTask { state: stats.clone(), last_run: chrono::offset::Utc::now() }) + Box::new + ( + StatsSaveTask::new + ( + stats.clone(), + schedule_from_option(config.stats.digest_schedule.clone()) + ) + ) ); server.tasks.add ( - Box::new(StatsDigestTask { state: stats.clone(), last_run: chrono::offset::Utc::now() }) + Box::new + ( + StatsDigestTask::new + ( + stats.clone(), + schedule_from_option(config.stats.digest_schedule.clone()) + ) + ) ); server diff --git a/src/server/stats/hits.rs b/src/server/stats/hits.rs index 87ecf8b..8053848 100644 --- a/src/server/stats/hits.rs +++ b/src/server/stats/hits.rs @@ -24,9 +24,6 @@ pub struct Hit pub struct HitStats { pub hits: HashMap<[u8; 64], Hit>, - pub last_save: DateTime, - pub last_digest: DateTime, - pub last_clear: DateTime, pub summary: Digest } @@ -37,9 +34,6 @@ impl HitStats HitStats { hits: HashMap::new(), - last_save: chrono::offset::Utc::now(), - last_digest: chrono::offset::Utc::now(), - last_clear: chrono::offset::Utc::now(), summary: Digest::new() } } diff --git a/src/server/stats/mod.rs b/src/server/stats/mod.rs index 9cd8bac..e789b7e 100644 --- a/src/server/stats/mod.rs +++ b/src/server/stats/mod.rs @@ -1,10 +1,11 @@ -use std::sync::Arc; +use std::{fs::create_dir, sync::Arc}; use axum::async_trait; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; +use cron::Schedule; use tokio::sync::Mutex; -use crate::{config::{read_config, Config, CONFIG_PATH}, filesystem::file::File, integrations::discord::post::post, task::Task}; +use crate::{config::{read_config, Config, CONFIG_PATH}, filesystem::file::File, integrations::discord::post::post, task::{next_job_time, schedule_from_option, Task}}; use self::{digest::{digest_message, process_hits}, file::StatsFile, hits::HitStats}; @@ -17,7 +18,27 @@ pub mod file; pub struct StatsSaveTask { pub state: Arc>, - pub last_run: DateTime + pub last_run: DateTime, + pub next_run: Option>, + pub schedule: Option +} + +impl StatsSaveTask +{ + pub fn new + ( + state: Arc>, + schedule: Option + ) -> StatsSaveTask + { + StatsSaveTask + { + state, + last_run: chrono::offset::Utc::now(), + next_run: if schedule.is_none() { None } else { next_job_time(schedule.clone().unwrap()) }, + schedule + } + } } #[async_trait] @@ -25,41 +46,53 @@ impl Task for StatsSaveTask { async fn run(&mut self) -> Result<(), crate::task::TaskError> { - let stats = self.state.lock().await; + let config = Config::load_or_default(CONFIG_PATH); + { + let stats = self.state.lock().await; + + if !std::path::Path::new(&config.stats.path).exists() + { + match create_dir(config.stats.path.to_string()) + { + Ok(_s) => {}, + Err(e) => {crate::debug(format!("Error creating stats dir {}",e), None)} + } + } + + let mut file = StatsFile::new(); + file.load(&stats); + file.write_bytes(); + } - let mut file = StatsFile::new(); - file.load(&stats); - file.write_bytes(); + self.schedule = schedule_from_option(config.stats.save_schedule.clone()); + + self.next_run = match &self.schedule + { + Some(s) => next_job_time(s.clone()), + None => None + }; self.last_run = chrono::offset::Utc::now(); Ok(()) } - fn next(&self) -> Option> + fn next(&mut self) -> Option> { - let config = Config::load_or_default(CONFIG_PATH); - - let time: chrono::prelude::DateTime = chrono::offset::Utc::now(); - let time_until_save = config.stats.save_period_seconds as i64 - (time - self.last_run).num_seconds(); - - if time_until_save < 0 - { - Some(time) - } - else - { - Some(self.last_run + Duration::seconds(time_until_save)) - } + self.next_run } fn runnable(&self) -> bool { - chrono::offset::Utc::now() > self.next().unwrap() + match self.next_run + { + Some(t) => chrono::offset::Utc::now() > t, + None => false + } } fn info(&self) -> String { - "Statistics saveing".to_string() + "Statistics saving".to_string() } } @@ -68,7 +101,27 @@ impl Task for StatsSaveTask pub struct StatsDigestTask { pub state: Arc>, - pub last_run: DateTime + pub last_run: DateTime, + pub schedule: Option, + pub next_run: Option> +} + +impl StatsDigestTask +{ + pub fn new + ( + state: Arc>, + schedule: Option + ) -> StatsDigestTask + { + StatsDigestTask + { + state, + last_run: chrono::offset::Utc::now(), + next_run: if schedule.is_none() { None } else { next_job_time(schedule.clone().unwrap()) }, + schedule + } + } } #[async_trait] @@ -76,61 +129,64 @@ impl Task for StatsDigestTask { async fn run(&mut self) -> Result<(), crate::task::TaskError> { - let mut stats = self.state.lock().await; - - let config = match read_config(CONFIG_PATH) { - Some(c) => c, - None => + let mut stats = self.state.lock().await; + + let config = match read_config(CONFIG_PATH) { - Config::default() - } - }; - - stats.summary = process_hits - ( - config.stats.path.clone(), - Some(stats.last_digest), - None, - config.stats.top_n_digest, - Some(stats.to_owned()) - ); - - let msg = digest_message(stats.summary.clone(), Some(stats.last_digest), None); - match config.notification_endpoint - { - Some(endpoint) => match post(&endpoint, msg).await + Some(c) => c, + None => { - Ok(_s) => (), - Err(e) => {crate::debug(format!("Error posting to discord\n{}", e), None);} - }, - None => () + Config::default() + } + }; + + stats.summary = process_hits + ( + config.stats.path.clone(), + Some(self.last_run), + None, + config.stats.top_n_digest, + Some(stats.to_owned()) + ); + + let msg = digest_message(stats.summary.clone(), Some(self.last_run), None); + match config.notification_endpoint + { + Some(endpoint) => match post(&endpoint, msg).await + { + Ok(_s) => (), + Err(e) => {crate::debug(format!("Error posting to discord\n{}", e), None);} + }, + None => () + } } + let config = Config::load_or_default(CONFIG_PATH); + self.schedule = schedule_from_option(config.stats.digest_schedule.clone()); + + self.next_run = match &self.schedule + { + Some(s) => next_job_time(s.clone()), + None => None + }; + self.last_run = chrono::offset::Utc::now(); Ok(()) } - fn next(&self) -> Option> + fn next(&mut self) -> Option> { - let config = Config::load_or_default(CONFIG_PATH); - - let time: chrono::prelude::DateTime = chrono::offset::Utc::now(); - let time_until_digest = config.stats.digest_period_seconds as i64 - (time - self.last_run).num_seconds(); - - if time_until_digest < 0 - { - Some(time) - } - else - { - Some(self.last_run + Duration::seconds(time_until_digest)) - } + self.next_run } fn runnable(&self) -> bool { - chrono::offset::Utc::now() > self.next().unwrap() + match self.next_run + { + Some(t) => chrono::offset::Utc::now() > t, + None => false + } } fn info(&self) -> String diff --git a/src/task/mod.rs b/src/task/mod.rs index 4ca7d48..47eb720 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -1,12 +1,13 @@ use core::fmt; -use std::{cmp::min, collections::HashMap, sync::Arc}; +use std::{cmp::min, collections::HashMap, str::FromStr, sync::Arc}; use axum::async_trait; use chrono::{DateTime, Local, Utc}; +use cron::Schedule; use tokio::{spawn, sync::Mutex}; use uuid::Uuid; -pub const DEFAULT_WAIT: tokio::time::Duration = tokio::time::Duration::from_secs(1); +pub const DEFAULT_WAIT: tokio::time::Duration = tokio::time::Duration::from_secs(60); #[derive(Debug, Clone)] pub struct TaskError @@ -26,7 +27,7 @@ impl fmt::Display for TaskError { pub trait Task { async fn run(&mut self) -> Result<(), TaskError>; - fn next(&self) -> Option>; + fn next(&mut self) -> Option>; fn runnable(&self) -> bool; fn info(&self) -> String; } @@ -36,15 +37,14 @@ pub trait Task /// - [TaskPool::run] loops continuously (with sleeps) running tasks when they are available pub struct TaskPool { - tasks: HashMap>>>, - closing: Arc> + tasks: HashMap>>> } impl TaskPool { pub fn new() -> TaskPool { - TaskPool { tasks: HashMap::new(), closing: Arc::new(Mutex::new(false)) } + TaskPool { tasks: HashMap::new() } } pub fn ntasks(&self) -> usize { self.tasks.len() } @@ -63,11 +63,6 @@ impl TaskPool self.tasks.remove(id); } } - - pub async fn stop(&mut self) - { - *self.closing.lock().await = true; - } /// Returns a duration to wait for the next runnable process /// and an information string about that process including @@ -82,14 +77,16 @@ impl TaskPool let now = chrono::offset::Utc::now(); let mut wait = u64::MAX; let mut info = String::new(); + let mut all_none = true; for (id, task_lock) in &self.tasks { - let task = task_lock.lock().await; + let mut task = task_lock.lock().await; match task.next() { Some(d) => { + all_none = false; let dt = (d-now).num_seconds(); if dt <= 0 { @@ -109,19 +106,22 @@ impl TaskPool } } - (tokio::time::Duration::from_secs(wait), info) + if all_none + { + (DEFAULT_WAIT, info) + } + else + { + (tokio::time::Duration::from_secs(wait), info) + } } - pub fn run(self) + pub fn run(self) -> tokio::task::JoinHandle<()> { spawn( async move { loop { - if self.closing.lock().await.to_owned() - { - break; - } for (id, task_lock) in &self.tasks { let mut task = task_lock.lock().await; @@ -140,10 +140,38 @@ impl TaskPool } } let (wait, info) = self.waiting_for().await; - crate::debug(format!("Next task\n {}\n Waiting for {}s", info, wait.as_secs()), None); - tokio::time::sleep(wait).await; + if wait > tokio::time::Duration::ZERO + { crate::debug(format!("Next task\n {}\n Waiting for {}s", info, wait.as_secs()), None); + tokio::time::sleep(wait).await; + } } } - ); + ) + } +} + +pub fn next_job_time(cron: Schedule) -> Option> +{ + let jobs: Vec> = cron.upcoming(Utc).take(2).collect(); + jobs.first().copied() +} + +pub fn schedule_from_option(cron: Option) -> Option +{ + if cron.is_some() + { + match Schedule::from_str(&cron.clone().unwrap()) + { + Ok(s) => Some(s), + Err(e) => + { + crate::debug(format!("Could not parse cron schedule {:?}, {}", cron, e), None); + None + } + } + } + else + { + None } } \ No newline at end of file diff --git a/tests/config.json b/tests/config.json index 1b1256a..8aa8313 100644 --- a/tests/config.json +++ b/tests/config.json @@ -9,11 +9,10 @@ }, "stats": { - "save_period_seconds": 10, "path": "stats", "hit_cooloff_seconds": 60, - "digest_period_seconds": 86400, - "log_files_clear_period_seconds": 2419200, + "save_schedule": "0 0 1 * * Wed *", + "digest_schedule": "0 0 1 * * Fri *", "ignore_regexes": ["/favicon.ico"] }, "content": diff --git a/tests/test_config.rs b/tests/test_config.rs index c878878..f3fce42 100644 --- a/tests/test_config.rs +++ b/tests/test_config.rs @@ -26,11 +26,10 @@ mod config assert_eq!(config.throttle.timeout_millis, 5000); assert_eq!(config.throttle.clear_period_seconds, 3600); - assert_eq!(config.stats.save_period_seconds, 10); + assert_eq!(config.stats.save_schedule, Some("0 0 1 * * Wed *".to_string())); assert_eq!(config.stats.path, "stats"); assert_eq!(config.stats.hit_cooloff_seconds, 60); - assert_eq!(config.stats.digest_period_seconds, 86400); - assert_eq!(config.stats.log_files_clear_period_seconds, 2419200); + assert_eq!(config.stats.digest_schedule, Some("0 0 1 * * Fri *".to_string())); assert_eq!(config.stats.ignore_regexes.unwrap(), vec!["/favicon.ico".to_string()]); assert_eq!(config.content.path, "/home/jerboa/Website/"); @@ -58,11 +57,10 @@ mod config { let stats = StatsConfig::default(); - assert_eq!(stats.save_period_seconds, 86400); + assert_eq!(stats.save_schedule, None); assert_eq!(stats.path, "stats"); assert_eq!(stats.hit_cooloff_seconds, 60); - assert_eq!(stats.digest_period_seconds, 86400); - assert_eq!(stats.log_files_clear_period_seconds, 2419200); + assert_eq!(stats.digest_schedule, None); assert_eq!(stats.ignore_regexes, None); assert_eq!(stats.top_n_digest, None); @@ -101,11 +99,10 @@ mod config let stats = config.stats; - assert_eq!(stats.save_period_seconds, 86400); + assert_eq!(stats.save_schedule, None); assert_eq!(stats.path, "stats"); assert_eq!(stats.hit_cooloff_seconds, 60); - assert_eq!(stats.digest_period_seconds, 86400); - assert_eq!(stats.log_files_clear_period_seconds, 2419200); + assert_eq!(stats.digest_schedule, None); assert_eq!(stats.ignore_regexes, None); assert_eq!(stats.top_n_digest, None); @@ -146,11 +143,10 @@ mod config assert_eq!(config.throttle.timeout_millis, 5000); assert_eq!(config.throttle.clear_period_seconds, 3600); - assert_eq!(config.stats.save_period_seconds, 10); + assert_eq!(config.stats.save_schedule, Some("0 0 1 * * Wed *".to_string())); assert_eq!(config.stats.path, "stats"); assert_eq!(config.stats.hit_cooloff_seconds, 60); - assert_eq!(config.stats.digest_period_seconds, 86400); - assert_eq!(config.stats.log_files_clear_period_seconds, 2419200); + assert_eq!(config.stats.digest_schedule, Some("0 0 1 * * Fri *".to_string())); assert_eq!(config.stats.ignore_regexes.unwrap(), vec!["/favicon.ico".to_string()]); assert_eq!(config.content.path, "/home/jerboa/Website/"); diff --git a/tests/test_stats.rs b/tests/test_stats.rs index aefb4c3..c655a08 100644 --- a/tests/test_stats.rs +++ b/tests/test_stats.rs @@ -3,7 +3,7 @@ mod common; #[cfg(test)] mod test_stats_graph { - use std::{collections::HashMap, fs::remove_file, iter::FusedIterator, path::Path}; + use std::{collections::HashMap, fs::remove_file, path::Path}; use busser::{filesystem::file::File, server::stats::{digest::{hits_by_hour_text_graph, process_hits, Digest}, file::StatsFile, hits::{collect_hits, Hit, HitStats}}}; use chrono::DateTime; diff --git a/tests/test_task.rs b/tests/test_task.rs index 367040e..f3c87bd 100644 --- a/tests/test_task.rs +++ b/tests/test_task.rs @@ -3,9 +3,11 @@ mod common; #[cfg(test)] mod task { - use std::sync::Arc; + use std::{str::FromStr, sync::Arc}; - use busser::{server::stats::{hits::HitStats, StatsSaveTask}, task::{TaskPool, DEFAULT_WAIT}}; + use busser::{server::stats::{hits::HitStats, StatsDigestTask, StatsSaveTask}, task::{schedule_from_option, Task, TaskPool, DEFAULT_WAIT}}; + use chrono::Timelike; + use cron::Schedule; use tokio::sync::Mutex; @@ -20,7 +22,9 @@ mod task assert_eq!(wait, DEFAULT_WAIT); let stats = Arc::new(Mutex::new(HitStats::new())); - let task = StatsSaveTask{ state: stats, last_run: chrono::offset::Utc::now() }; + let task = StatsSaveTask{ state: stats.clone(), last_run: chrono::offset::Utc::now(), next_run: None, schedule: None}; + assert_eq!(task.runnable(), false); + assert_eq!(task.info(), "Statistics saving".to_string()); pool.add(Box::new(task)); @@ -28,5 +32,52 @@ mod task let (wait, _info) = pool.waiting_for().await; assert!(wait > tokio::time::Duration::ZERO); + assert_eq!(wait, DEFAULT_WAIT); + + let hour = chrono::offset::Utc::now().hour(); + let schedule = format!("0 0 {} * * * *", (hour+2)%24); + + let task = StatsDigestTask::new + ( + stats, + Some(Schedule::from_str(&schedule).unwrap()) + ); + + assert_eq!(task.runnable(), false); + assert_eq!(task.info(), "Statistics digest".to_string()); + + let id = pool.add(Box::new(task)); + + assert_eq!(pool.ntasks(), 2); + + let (wait, _info) = pool.waiting_for().await; + println!("{:?}", wait); + assert!(wait > DEFAULT_WAIT); + + pool.remove(&id); + + let (wait, _info) = pool.waiting_for().await; + assert!(wait > tokio::time::Duration::ZERO); + assert_eq!(wait, DEFAULT_WAIT); + + let handle = pool.run(); + handle.abort(); + + } + + #[test] + pub fn test_schedule() + { + let option: Option = None; + + assert_eq!(schedule_from_option(option), None); + + let option = "not_a_schedule_string".to_string(); + + assert_eq!(schedule_from_option(Some(option)), None); + + let option = "0 * * * * * *".to_string(); + + assert_eq!(schedule_from_option(Some(option)), Some(Schedule::from_str("0 * * * * * *").unwrap())); } } \ No newline at end of file