Skip to content

Commit

Permalink
refactor(webserver): move scheduler cron to webserver (#1406)
Browse files Browse the repository at this point in the history
  • Loading branch information
wsxiaoys authored Feb 7, 2024
1 parent 2f757de commit 5158eec
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 163 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions crates/tabby-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,6 @@ impl Default for ServerConfig {
#[async_trait]
pub trait RepositoryAccess: Send + Sync {
async fn list_repositories(&self) -> Result<Vec<RepositoryConfig>>;
async fn create_job_run(&self, _name: String) -> Result<i32> {
Ok(0)
}
async fn update_job_stdout(&self, _id: i32, _stdout: String) -> Result<()> {
Ok(())
}
async fn update_job_stderr(&self, _id: i32, _stderr: String) -> Result<()> {
Ok(())
}
async fn complete_job_run(&self, _id: i32, _exit_code: i32) -> Result<()> {
Ok(())
}
}

pub struct ConfigRepositoryAccess;
Expand Down
1 change: 0 additions & 1 deletion crates/tabby-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ serdeconv.workspace = true
cargo-lock = { version = "9.0.0", features = ["dependency-tree"] }
tokio-cron-scheduler = { workspace = true }
tokio = { workspace = true, features = ["process"] }
chrono.workspace = true

[dev-dependencies]
temp_testdir = "0.2"
Expand Down
92 changes: 15 additions & 77 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,101 +5,39 @@ mod index;
mod repository;
mod utils;

use std::{process::Stdio, sync::Arc};
use std::sync::Arc;

use anyhow::Result;
use tabby_common::config::{RepositoryAccess, RepositoryConfig};
use tokio::io::AsyncBufReadExt;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info, warn};

pub async fn scheduler<T: RepositoryAccess + 'static>(
now: bool,
access: T,
args: &[String],
) -> Result<()> {
pub async fn scheduler<T: RepositoryAccess + 'static>(now: bool, access: T) -> Result<()> {
if now {
let repositories = access.list_repositories().await?;
job_sync(&repositories);
job_index(&repositories);
} else {
let args = args.to_owned();
let access = Arc::new(access);
let scheduler = JobScheduler::new().await?;
let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(()));

// Every 10 minutes
scheduler
.add(Job::new_async(
"0 1/10 * * * *",
move |uuid, mut scheduler| {
let access = access.clone();
let args = args.clone();
let scheduler_mutex = scheduler_mutex.clone();
Box::pin(async move {
let Ok(_guard) = scheduler_mutex.try_lock() else {
warn!("Scheduler job overlapped, skipping...");
return;
};
.add(Job::new_async("0 1/10 * * * *", move |_, _| {
let access = access.clone();
let scheduler_mutex = scheduler_mutex.clone();
Box::pin(async move {
let Ok(_guard) = scheduler_mutex.try_lock() else {
warn!("Scheduler job overlapped, skipping...");
return;
};

info!("Running scheduler job...");
let exe = std::env::current_exe().unwrap();
let job_id = access
.create_job_run("scheduler".to_owned())
.await
.unwrap_or_default();

let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();

{
// Pipe stdout
let access = access.clone();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = access.update_job_stdout(job_id, line + "\n").await;
}
});
}

{
// Pipe stderr
let access = access.clone();
let stderr = child.stderr.take().unwrap();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = access.update_job_stderr(job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
let _ = access.complete_job_run(job_id, exit_code).await;
} else {
let _ = access.complete_job_run(job_id, -1).await;
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
info!(
"Next time for scheduler job is {:?}",
next_tick.with_timezone(&chrono::Local)
);
}
})
},
)?)
let repositories = access.list_repositories().await.unwrap();
job_sync(&repositories);
job_index(&repositories);
})
})?)
.await?;

info!("Scheduler activated...");
Expand Down
2 changes: 1 addition & 1 deletion crates/tabby-scheduler/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod tests {
)],
};

let res = tabby_scheduler::scheduler(true, config, &[]).await;
let res = tabby_scheduler::scheduler(true, config).await;
res.expect("Failed to run scheduler");
}
}
12 changes: 4 additions & 8 deletions crates/tabby/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,12 @@ async fn main() {
token: Some(token),
}) => {
let client = tabby_webserver::public::create_scheduler_client(&url, &token).await;
tabby_scheduler::scheduler(
now,
client,
&["--url".to_owned(), url, "--token".to_owned(), token],
)
.await
.unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err))
tabby_scheduler::scheduler(now, client)
.await
.unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err))
}
Commands::Scheduler(SchedulerArgs { now, .. }) => {
tabby_scheduler::scheduler(now, ConfigRepositoryAccess, &[])
tabby_scheduler::scheduler(now, ConfigRepositoryAccess)
.await
.unwrap_or_else(|err| fatal!("Scheduler failed due to '{}'", err))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
//! db maintenance jobs
use std::sync::Arc;

use anyhow::Result;
use tabby_db::DbConn;
use tokio_cron_scheduler::Job;
use tracing::error;

pub async fn refresh_token_job(db_conn: DbConn) -> Result<Job> {
use crate::schema::auth::AuthenticationService;

pub async fn refresh_token_job(auth: Arc<dyn AuthenticationService>) -> Result<Job> {
// job is run every 2 hours
let job = Job::new_async("0 0 1/2 * * * *", move |_, _| {
let db_conn = db_conn.clone();
let auth = auth.clone();
Box::pin(async move {
let res = db_conn.delete_expired_token().await;
let res = auth.delete_expired_token().await;
if let Err(e) = res {
error!("failed to delete expired token: {}", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
mod db;
mod scheduler;

use std::sync::Arc;

use tabby_db::DbConn;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::error;

use crate::schema::{auth::AuthenticationService, job::JobService, worker::WorkerService};

async fn new_job_scheduler(jobs: Vec<Job>) -> anyhow::Result<JobScheduler> {
let scheduler = JobScheduler::new().await?;
for job in jobs {
Expand All @@ -13,16 +17,26 @@ async fn new_job_scheduler(jobs: Vec<Job>) -> anyhow::Result<JobScheduler> {
Ok(scheduler)
}

pub async fn run_cron(db_conn: &DbConn) {
let db_conn = db_conn.clone();
pub async fn run_cron(
auth: Arc<dyn AuthenticationService>,
job: Arc<dyn JobService>,
worker: Arc<dyn WorkerService>,
local_port: u16,
) {
let mut jobs = vec![];

let Ok(job1) = db::refresh_token_job(db_conn.clone()).await else {
let Ok(job1) = db::refresh_token_job(auth).await else {
error!("failed to create db job");
return;
};
jobs.push(job1);

let Ok(job2) = scheduler::scheduler_job(job, worker, local_port).await else {
error!("failed to create scheduler job");
return;
};
jobs.push(job2);

if new_job_scheduler(jobs).await.is_err() {
error!("failed to start job scheduler");
};
Expand Down
89 changes: 89 additions & 0 deletions ee/tabby-webserver/src/cron/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{process::Stdio, sync::Arc, time::Duration};

use tokio::io::AsyncBufReadExt;
use tokio_cron_scheduler::Job;
use tracing::{info, warn};

use crate::schema::{job::JobService, worker::WorkerService, ServiceLocator};

pub async fn scheduler_job(
job: Arc<dyn JobService>,
worker: Arc<dyn WorkerService>,
local_port: u16,
) -> anyhow::Result<Job> {
let scheduler_mutex = Arc::new(tokio::sync::Mutex::new(()));

// let job = Job::new_async("0 1/10 * * * *", move |uuid, mut scheduler| {
let job = Job::new_one_shot_async(Duration::from_secs(1), move |uuid, mut scheduler| {
let worker = worker.clone();
let job = job.clone();
let scheduler_mutex = scheduler_mutex.clone();
Box::pin(async move {
let Ok(_guard) = scheduler_mutex.try_lock() else {
warn!("Scheduler job overlapped, skipping...");
return;
};

info!("Running scheduler job...");
let exe = std::env::current_exe().unwrap();
let job_id = job
.create_job_run("scheduler".to_owned())
.await
.unwrap_or_default();

let mut child = tokio::process::Command::new(exe)
.arg("scheduler")
.arg("--now")
.arg("--url")
.arg(format!("localhost:{local_port}"))
.arg("--token")
.arg(worker.read_registration_token().await.unwrap())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();

{
// Pipe stdout
let job = job.clone();
let stdout = child.stdout.take().unwrap();
tokio::spawn(async move {
let stdout = tokio::io::BufReader::new(stdout);
let mut stdout = stdout.lines();
while let Ok(Some(line)) = stdout.next_line().await {
println!("{line}");
let _ = job.update_job_stdout(job_id, line + "\n").await;
}
});
}

{
// Pipe stderr
let stderr = child.stderr.take().unwrap();
let job = job.clone();
tokio::spawn(async move {
let stderr = tokio::io::BufReader::new(stderr);
let mut stdout = stderr.lines();
while let Ok(Some(line)) = stdout.next_line().await {
eprintln!("{line}");
let _ = job.update_job_stderr(job_id, line + "\n").await;
}
});
}
if let Some(exit_code) = child.wait().await.ok().and_then(|s| s.code()) {
let _ = job.complete_job_run(job_id, exit_code).await;
} else {
let _ = job.complete_job_run(job_id, -1).await;
}

if let Ok(Some(next_tick)) = scheduler.next_tick_for_job(uuid).await {
info!(
"Next time for scheduler job is {:?}",
next_tick.with_timezone(&chrono::Local)
);
}
})
})?;

Ok(job)
}
Loading

0 comments on commit 5158eec

Please sign in to comment.