Skip to content

Commit

Permalink
Add abort handle to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerboa-app committed May 20, 2024
1 parent 699bd94 commit b3a8a51
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::
config::{read_config, Config, CONFIG_PATH}, content::{filter::ContentFilter, sitemap::SiteMap, Content}, server::throttle::{handle_throttle, IpThrottler}, task::{schedule_from_option, TaskPool}, CRAB
};

use std::{collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -15,7 +15,7 @@ use axum::
};
use axum_server::tls_rustls::RustlsConfig;

use super::{api::{stats::StatsDigest, ApiRequest}, stats::{digest::Digest, hits::{log_stats, HitStats}, StatsSaveTask, StatsDigestTask}};
use super::{api::{stats::StatsDigest, ApiRequest}, stats::{hits::{log_stats, HitStats}, StatsSaveTask, StatsDigestTask}};

/// An https server that reads a directory configured with [Config]
/// ```.html``` pages and resources, then serves them.
Expand Down
18 changes: 4 additions & 14 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ pub trait Task
/// - [TaskPool::run] loops continuously (with sleeps) running tasks when they are available
pub struct TaskPool
{
tasks: HashMap<Uuid, Arc<Mutex<Box<dyn Task + Send>>>>,
closing: Arc<Mutex<bool>>
tasks: HashMap<Uuid, Arc<Mutex<Box<dyn Task + Send>>>>
}

impl TaskPool
{
pub fn new() -> TaskPool
{
TaskPool { tasks: HashMap::new(), closing: Arc::new(Mutex::new(false)) }
TaskPool { tasks: HashMap::new() }
}

pub fn ntasks(&self) -> usize { self.tasks.len() }
Expand All @@ -64,11 +63,6 @@ impl TaskPool
self.tasks.remove(id);
}
}

pub async fn stop(&mut self)
{
*self.closing.lock().await = true;
}

/// Returns a duration to wait for the next runnable process
/// and an information string about that process including
Expand Down Expand Up @@ -122,16 +116,12 @@ impl TaskPool
}
}

pub fn run(self)
pub fn run(self) -> tokio::task::JoinHandle<()>
{
spawn(
async move {
loop
{
if self.closing.lock().await.to_owned()
{
break;
}
for (id, task_lock) in &self.tasks
{
let mut task = task_lock.lock().await;
Expand All @@ -156,7 +146,7 @@ impl TaskPool
}
}
}
);
)
}
}

Expand Down
30 changes: 28 additions & 2 deletions tests/test_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod task
{
use std::{str::FromStr, sync::Arc};

use busser::{server::stats::{hits::HitStats, StatsDigestTask, StatsSaveTask}, task::{Task, TaskPool, DEFAULT_WAIT}};
use busser::{server::stats::{hits::HitStats, StatsDigestTask, StatsSaveTask}, task::{schedule_from_option, Task, TaskPool, DEFAULT_WAIT}};
use chrono::Timelike;
use cron::Schedule;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -46,12 +46,38 @@ mod task
assert_eq!(task.runnable(), false);
assert_eq!(task.info(), "Statistics digest".to_string());

pool.add(Box::new(task));
let id = pool.add(Box::new(task));

assert_eq!(pool.ntasks(), 2);

let (wait, _info) = pool.waiting_for().await;
println!("{:?}", wait);
assert!(wait > DEFAULT_WAIT);

pool.remove(&id);

let (wait, _info) = pool.waiting_for().await;
assert!(wait > tokio::time::Duration::ZERO);
assert_eq!(wait, DEFAULT_WAIT);

let handle = pool.run();
handle.abort();

}

#[test]
pub fn test_schedule()
{
let option: Option<String> = None;

assert_eq!(schedule_from_option(option), None);

let option = "not_a_schedule_string".to_string();

assert_eq!(schedule_from_option(Some(option)), None);

let option = "0 * * * * * *".to_string();

assert_eq!(schedule_from_option(Some(option)), Some(Schedule::from_str("0 * * * * * *").unwrap()));
}
}

0 comments on commit b3a8a51

Please sign in to comment.