From 1f63fc91730f30fbbd803d34a0bb0e45392efda1 Mon Sep 17 00:00:00 2001 From: Michael Weiss Date: Wed, 17 May 2023 21:05:36 +0200 Subject: [PATCH] Use a database connection pool instead of mutex locking This replaces the mutex-based PoC of 13bcbde via the `r2d2` feature of the `diesel` crate. Using `unwrap()` should be fine as `Pool::get()` only errors if the code runs into a timeout (in which case there's probably a network issue or the database is down so we could only return a proper error message instead of panicking). The `min_idle` setting defaults to `None` which causes the `max_idle` value to be used (defaults to 10). The `build()` function "will block until the pool has established its configured minimum number of connections". Therefore, it seems best to lower the value to 1 since butido is a CLI and not a service/daemon (so the pool will open additional connections on demand for the async code instead of establishing all connections at the beginning - we probably only need few database connections in most cases). Signed-off-by: Michael Weiss --- Cargo.lock | 21 +++++++++++++++++++++ Cargo.toml | 2 +- src/commands/build.rs | 21 ++++++++++----------- src/commands/find_artifact.rs | 8 ++++---- src/commands/metrics.rs | 28 +++++++++++++--------------- src/commands/release.rs | 20 ++++++++------------ src/db/connection.rs | 24 +++++++++++++++++++----- src/db/find_artifacts.rs | 11 ++++++----- src/endpoint/scheduler.rs | 25 +++++++++++++------------ src/main.rs | 12 ++++++------ src/orchestrator/orchestrator.rs | 12 +++++++----- 11 files changed, 108 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5838055b..5b53efb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -600,6 +600,7 @@ dependencies = [ "diesel_derives", "itoa", "pq-sys", + "r2d2", "serde_json", "uuid", ] @@ -1972,6 +1973,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + [[package]] name = "rand" version = "0.4.3" @@ -2188,6 +2200,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index fe9d784f..b29b00ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ config = { version = "0.11", default-features = false, features = [ "tom csv = "1" daggy = { version = "0.8", features = [ "serde" ] } dialoguer = "0.10" -diesel = { version = "2", features = ["postgres", "chrono", "uuid", "serde_json"] } +diesel = { version = "2", features = ["postgres", "chrono", "uuid", "serde_json", "r2d2"] } diesel_migrations = "2" filters = "0.4" futures = "0.3" diff --git a/src/commands/build.rs b/src/commands/build.rs index b3e6d747..09ab0b45 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -15,7 +15,6 @@ use std::path::Path; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -27,6 +26,8 @@ use diesel::ExpressionMethods; use diesel::PgConnection; use diesel::QueryDsl; use diesel::RunQueryDsl; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use itertools::Itertools; use tracing::{debug, info, trace, warn}; use tokio::sync::RwLock; @@ -58,7 +59,7 @@ pub async fn build( repo_root: &Path, matches: &ArgMatches, progressbars: ProgressBars, - database_connection: PgConnection, + database_pool: Pool>, config: &Configuration, repo: Repository, repo_path: &Path, @@ -297,11 +298,9 @@ pub async fn build( .collect::>>()?; trace!("Setting up database jobs for Package, GitHash, Image"); - let database_connection = Arc::new(Mutex::new(database_connection)); - // TODO: Avoid the locking here!: - let db_package = async { Package::create_or_fetch(&mut database_connection.clone().lock().unwrap(), package) }; - let db_githash = async { GitHash::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &hash_str) }; - let db_image = async { Image::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &image_name) }; + let db_package = async { Package::create_or_fetch(&mut database_pool.get().unwrap(), package) }; + let db_githash = async { GitHash::create_or_fetch(&mut database_pool.get().unwrap(), &hash_str) }; + let db_image = async { Image::create_or_fetch(&mut database_pool.get().unwrap(), &image_name) }; let db_envs = async { additional_env .clone() @@ -309,7 +308,7 @@ pub async fn build( .map(|(k, v)| async { let k: EnvironmentVariableName = k; // hack to work around move semantics let v: String = v; // hack to work around move semantics - EnvVar::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &k, &v) + EnvVar::create_or_fetch(&mut database_pool.get().unwrap(), &k, &v) }) .collect::>() .collect::>>() @@ -325,7 +324,7 @@ pub async fn build( trace!("Database jobs for Package, GitHash, Image finished successfully"); trace!("Creating Submit in database"); let submit = Submit::create( - &mut database_connection.clone().lock().unwrap(), + &mut database_pool.get().unwrap(), &now, &submit_id, &db_image, @@ -366,7 +365,7 @@ pub async fn build( .endpoint_config(endpoint_configurations) .staging_store(staging_store) .release_stores(release_stores) - .database(database_connection.clone()) + .database(database_pool.clone()) .source_cache(source_cache) .submit(submit) .log_dir(if matches.get_flag("write-log-file") { @@ -404,7 +403,7 @@ pub async fn build( let data = schema::jobs::table .filter(schema::jobs::dsl::uuid.eq(job_uuid)) .inner_join(schema::packages::table) - .first::<(Job, Package)>(&mut *database_connection.as_ref().lock().unwrap())?; + .first::<(Job, Package)>(&mut *database_pool.get().unwrap())?; let number_log_lines = *config.build_error_lines(); writeln!( diff --git a/src/commands/find_artifact.rs b/src/commands/find_artifact.rs index d34a0f36..700fef8d 100644 --- a/src/commands/find_artifact.rs +++ b/src/commands/find_artifact.rs @@ -13,7 +13,6 @@ use std::path::PathBuf; use std::io::Write; use std::sync::Arc; -use std::sync::Mutex; use std::convert::TryFrom; use anyhow::Context; @@ -21,6 +20,8 @@ use anyhow::Error; use anyhow::Result; use clap::ArgMatches; use diesel::PgConnection; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use itertools::Itertools; use tracing::{debug, trace}; @@ -34,7 +35,7 @@ use crate::util::progress::ProgressBars; use crate::util::docker::ImageName; /// Implementation of the "find_artifact" subcommand -pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progressbars: ProgressBars, repo: Repository, database_connection: PgConnection) -> Result<()> { +pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progressbars: ProgressBars, repo: Repository, database_pool: Pool>) -> Result<()> { let package_name_regex = crate::commands::util::mk_package_name_regex({ matches.get_one::("package_name_regex").unwrap() // safe by clap })?; @@ -97,7 +98,6 @@ pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progres None }; - let database = Arc::new(Mutex::new(database_connection)); repo.packages() .filter(|p| package_name_regex.captures(p.name()).is_some()) .filter(|p| { @@ -113,7 +113,7 @@ pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progres .config(config) .release_stores(&release_stores) .staging_store(staging_store.as_ref()) - .database_connection(database.clone()) + .database_pool(database_pool.clone()) .env_filter(&env_filter) .script_filter(script_filter) .image_name(image_name.as_ref()) diff --git a/src/commands/metrics.rs b/src/commands/metrics.rs index 4992561c..441b8bc1 100644 --- a/src/commands/metrics.rs +++ b/src/commands/metrics.rs @@ -12,14 +12,14 @@ use std::path::Path; use std::io::Write; -use std::sync::Arc; -use std::sync::Mutex; use anyhow::Error; use anyhow::Result; use diesel::PgConnection; use diesel::QueryDsl; use diesel::RunQueryDsl; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use walkdir::WalkDir; use crate::config::Configuration; @@ -29,7 +29,7 @@ pub async fn metrics( repo_path: &Path, config: &Configuration, repo: Repository, - conn: PgConnection, + pool: Pool>, ) -> Result<()> { let mut out = std::io::stdout(); @@ -46,18 +46,16 @@ pub async fn metrics( }) .count(); - let conn = Arc::new(Mutex::new(conn)); - // TODO: Avoid the locking here (makes async pointless)!: - let n_artifacts = async { crate::schema::artifacts::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_endpoints = async { crate::schema::endpoints::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_envvars = async { crate::schema::envvars::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_githashes = async { crate::schema::githashes::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_images = async { crate::schema::images::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_jobs = async { crate::schema::jobs::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_packages = async { crate::schema::packages::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_releasestores = async { crate::schema::release_stores::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_releases = async { crate::schema::releases::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; - let n_submits = async { crate::schema::submits::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_artifacts = async { crate::schema::artifacts::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_endpoints = async { crate::schema::endpoints::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_envvars = async { crate::schema::envvars::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_githashes = async { crate::schema::githashes::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_images = async { crate::schema::images::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_jobs = async { crate::schema::jobs::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_packages = async { crate::schema::packages::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_releasestores = async { crate::schema::release_stores::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_releases = async { crate::schema::releases::table.count().get_result::(&mut pool.get().unwrap()) }; + let n_submits = async { crate::schema::submits::table.count().get_result::(&mut pool.get().unwrap()) }; let ( n_artifacts, diff --git a/src/commands/release.rs b/src/commands/release.rs index fcee051e..804ba040 100644 --- a/src/commands/release.rs +++ b/src/commands/release.rs @@ -12,8 +12,6 @@ use std::io::Write; use std::path::PathBuf; -use std::sync::Arc; -use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -64,7 +62,7 @@ async fn new_release( debug!("Release called for: {:?} {:?}", pname, pvers); - let mut conn = db_connection_config.establish_connection()?; + let pool = db_connection_config.establish_pool()?; let submit_uuid = matches .get_one::("submit_uuid") .map(|s| uuid::Uuid::parse_str(s.as_ref())) @@ -74,7 +72,7 @@ async fn new_release( let submit = crate::schema::submits::dsl::submits .filter(crate::schema::submits::dsl::uuid.eq(submit_uuid)) - .first::(&mut conn)?; + .first::(&mut pool.get().unwrap())?; debug!("Found Submit: {:?}", submit_uuid); let arts = { @@ -93,7 +91,7 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&mut conn)? + query.load::(&mut pool.get().unwrap())? } (Some(name), None) => { let query = sel.filter(crate::schema::packages::name.eq(name)); @@ -101,7 +99,7 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&mut conn)? + query.load::(&mut pool.get().unwrap())? } (None, Some(vers)) => { let query = sel.filter(crate::schema::packages::version.like(vers)); @@ -109,14 +107,14 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&mut conn)? + query.load::(&mut pool.get().unwrap())? } (None, None) => { debug!( "Query: {:?}", diesel::debug_query::(&sel) ); - sel.load::(&mut conn)? + sel.load::(&mut pool.get().unwrap())? } } }; @@ -138,13 +136,11 @@ async fn new_release( let staging_base: &PathBuf = &config.staging_directory().join(submit.uuid.to_string()); - let release_store = crate::db::models::ReleaseStore::create(&mut conn, release_store_name)?; + let release_store = crate::db::models::ReleaseStore::create(&mut pool.get().unwrap(), release_store_name)?; let do_update = matches.get_flag("package_do_update"); let interactive = !matches.get_flag("noninteractive"); let now = chrono::offset::Local::now().naive_local(); - // TODO: Find a proper solution to resolve: `error: captured variable cannot escape `FnMut` closure body`: - let conn = Arc::new(Mutex::new(conn)); let any_err = arts.into_iter() .map(|art| async { let art = art; // ensure it is moved @@ -186,7 +182,7 @@ async fn new_release( .map_err(Error::from) .and_then(|_| { debug!("Updating {:?} to set released = true", art); - let rel = crate::db::models::Release::create(&mut conn.clone().lock().unwrap(), &art, &now, &release_store)?; + let rel = crate::db::models::Release::create(&mut pool.get().unwrap(), &art, &now, &release_store)?; debug!("Release object = {:?}", rel); Ok(dest_path) }) diff --git a/src/db/connection.rs b/src/db/connection.rs index bd386203..e07d1471 100644 --- a/src/db/connection.rs +++ b/src/db/connection.rs @@ -13,6 +13,8 @@ use anyhow::Result; use clap::ArgMatches; use diesel::pg::PgConnection; use diesel::prelude::*; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use getset::Getters; use tracing::debug; @@ -76,9 +78,8 @@ impl<'a> DbConnectionConfig<'a> { }) } - pub fn establish_connection(self) -> Result { - debug!("Trying to connect to database: {:?}", self); - let database_uri: String = format!( + fn get_database_uri(self) -> String { + format!( "postgres://{user}:{password}@{host}:{port}/{name}?connect_timeout={timeout}", host = self.database_host, port = self.database_port, @@ -86,8 +87,21 @@ impl<'a> DbConnectionConfig<'a> { password = self.database_password, name = self.database_name, timeout = self.database_connection_timeout, - ); - PgConnection::establish(&database_uri).map_err(Error::from) + ) + } + + pub fn establish_connection(self) -> Result { + debug!("Trying to connect to database: {:?}", self); + PgConnection::establish(&self.get_database_uri()).map_err(Error::from) + } + + pub fn establish_pool(self) -> Result>> { + debug!("Trying to create a connection pool for database: {:?}", self); + let manager = ConnectionManager::::new(self.get_database_uri()); + Pool::builder() + .min_idle(Some(1)) + .build(manager) + .map_err(Error::from) } } diff --git a/src/db/find_artifacts.rs b/src/db/find_artifacts.rs index dfd80f72..f421d345 100644 --- a/src/db/find_artifacts.rs +++ b/src/db/find_artifacts.rs @@ -11,7 +11,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use std::sync::Mutex; use anyhow::Result; use chrono::NaiveDateTime; @@ -21,6 +20,8 @@ use diesel::JoinOnDsl; use diesel::PgConnection; use diesel::QueryDsl; use diesel::RunQueryDsl; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use tracing::{debug, trace}; use resiter::AndThen; use resiter::FilterMap; @@ -53,7 +54,7 @@ use crate::util::docker::ImageName; #[derive(typed_builder::TypedBuilder)] pub struct FindArtifacts<'a> { config: &'a Configuration, - database_connection: Arc>, + database_pool: Pool>, /// The release stores to search in release_stores: &'a [Arc], @@ -153,7 +154,7 @@ impl<'a> FindArtifacts<'a> { (arts, jobs) }) - .load::<(dbmodels::Artifact, dbmodels::Job)>(&mut *self.database_connection.as_ref().lock().unwrap())? + .load::<(dbmodels::Artifact, dbmodels::Job)>(&mut self.database_pool.get().unwrap())? .into_iter() .inspect(|(art, job)| debug!("Filtering further: {:?}, job {:?}", art, job.id)) // @@ -172,7 +173,7 @@ impl<'a> FindArtifacts<'a> { let job = tpl.1; let job_env: Vec<(String, String)> = job - .env(&mut self.database_connection.as_ref().lock().unwrap())? + .env(&mut self.database_pool.get().unwrap())? .into_iter() .map(|var: dbmodels::EnvVar| (var.name, var.value)) .collect(); @@ -187,7 +188,7 @@ impl<'a> FindArtifacts<'a> { Ok((_, bl)) => *bl, }) .and_then_ok(|(art, _)| { - if let Some(release) = art.get_release(&mut self.database_connection.as_ref().lock().unwrap())? { + if let Some(release) = art.get_release(&mut self.database_pool.get().unwrap())? { Ok((art, Some(release.release_date))) } else { Ok((art, None)) diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 6c7af3bf..9a02fc2e 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -10,7 +10,6 @@ use std::path::PathBuf; use std::sync::Arc; -use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -18,6 +17,8 @@ use anyhow::Error; use anyhow::Result; use colored::Colorize; use diesel::PgConnection; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use indicatif::ProgressBar; use itertools::Itertools; use tracing::trace; @@ -43,7 +44,7 @@ pub struct EndpointScheduler { staging_store: Arc>, release_stores: Vec>, - db: Arc>, + db: Pool>, submit: crate::db::models::Submit, } @@ -52,7 +53,7 @@ impl EndpointScheduler { endpoints: Vec, staging_store: Arc>, release_stores: Vec>, - db: Arc>, + db: Pool>, submit: crate::db::models::Submit, log_dir: Option, ) -> Result { @@ -118,7 +119,7 @@ pub struct JobHandle { endpoint: EndpointHandle, job: RunnableJob, bar: ProgressBar, - db: Arc>, + db: Pool>, staging_store: Arc>, release_stores: Vec>, submit: crate::db::models::Submit, @@ -135,9 +136,9 @@ impl JobHandle { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::(); let endpoint_uri = self.endpoint.uri().clone(); let endpoint_name = self.endpoint.name().clone(); - let endpoint = dbmodels::Endpoint::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.endpoint.name())?; - let package = dbmodels::Package::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.job.package())?; - let image = dbmodels::Image::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.job.image())?; + let endpoint = dbmodels::Endpoint::create_or_fetch(&mut self.db.get().unwrap(), self.endpoint.name())?; + let package = dbmodels::Package::create_or_fetch(&mut self.db.get().unwrap(), self.job.package())?; + let image = dbmodels::Image::create_or_fetch(&mut self.db.get().unwrap(), self.job.image())?; let envs = self.create_env_in_db()?; let job_id = *self.job.uuid(); trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name()); @@ -187,7 +188,7 @@ impl JobHandle { })?; let job = dbmodels::Job::create( - &mut self.db.as_ref().lock().unwrap(), + &mut self.db.get().unwrap(), &job_id, &self.submit, &endpoint, @@ -201,7 +202,7 @@ impl JobHandle { trace!("DB: Job entry for job {} created: {}", job.uuid, job.id); for env in envs { - dbmodels::JobEnv::create(&mut self.db.as_ref().lock().unwrap(), &job, &env) + dbmodels::JobEnv::create(&mut self.db.get().unwrap(), &job, &env) .with_context(|| format!("Creating Environment Variable mapping for Job: {}", job.uuid))?; } @@ -246,7 +247,7 @@ impl JobHandle { let staging_read = self.staging_store.read().await; for p in paths.iter() { trace!("DB: Creating artifact entry for path: {}", p.display()); - let _ = dbmodels::Artifact::create(&mut self.db.as_ref().lock().unwrap(), p, &job)?; + let _ = dbmodels::Artifact::create(&mut self.db.get().unwrap(), p, &job)?; r.push({ staging_read .get(p) @@ -294,7 +295,7 @@ impl JobHandle { .inspect(|(k, v)| { trace!("Creating environment variable in database: {} = {}", k, v) }) - .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), k, v)) + .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.get().unwrap(), k, v)) .collect::>>() }) .transpose()? @@ -309,7 +310,7 @@ impl JobHandle { .inspect(|(k, v)| { trace!("Creating environment variable in database: {} = {}", k, v) }) - .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), k, v)) + .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.get().unwrap(), k, v)) }) .collect() } diff --git a/src/main.rs b/src/main.rs index 210470af..35306558 100644 --- a/src/main.rs +++ b/src/main.rs @@ -168,7 +168,7 @@ async fn main() -> Result<()> { Some(("generate-completions", matches)) => generate_completions(matches), Some(("db", matches)) => crate::commands::db(db_connection_config, &config, matches)?, Some(("build", matches)) => { - let conn = db_connection_config.establish_connection()?; + let pool = db_connection_config.establish_pool()?; let repo = load_repo()?; @@ -176,7 +176,7 @@ async fn main() -> Result<()> { repo_path, matches, progressbars, - conn, + pool, &config, repo, repo_path, @@ -214,8 +214,8 @@ async fn main() -> Result<()> { Some(("find-artifact", matches)) => { let repo = load_repo()?; - let conn = db_connection_config.establish_connection()?; - crate::commands::find_artifact(matches, &config, progressbars, repo, conn) + let pool = db_connection_config.establish_pool()?; + crate::commands::find_artifact(matches, &config, progressbars, repo, pool) .await .context("find-artifact command failed")? } @@ -256,8 +256,8 @@ async fn main() -> Result<()> { Some(("metrics", _)) => { let repo = load_repo()?; - let conn = db_connection_config.establish_connection()?; - crate::commands::metrics(repo_path, &config, repo, conn) + let pool = db_connection_config.establish_pool()?; + crate::commands::metrics(repo_path, &config, repo, pool) .await .context("metrics command failed")? } diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 2aa9e13f..3c6e79ee 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -21,6 +21,8 @@ use anyhow::Context; use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; use git2::Repository; use indicatif::ProgressBar; use itertools::Itertools; @@ -164,7 +166,7 @@ pub struct Orchestrator<'a> { jobdag: Dag, config: &'a Configuration, repository: Repository, - database: Arc>, + database: Pool>, } #[derive(TypedBuilder)] @@ -175,7 +177,7 @@ pub struct OrchestratorSetup<'a> { release_stores: Vec>, source_cache: SourceCache, jobdag: Dag, - database: Arc>, + database: Pool>, submit: dbmodels::Submit, log_dir: Option, config: &'a Configuration, @@ -455,7 +457,7 @@ struct TaskPreparation<'a> { scheduler: &'a EndpointScheduler, staging_store: Arc>, release_stores: Vec>, - database: Arc>, + database: Pool>, } /// Helper type for executing one job task @@ -473,7 +475,7 @@ struct JobTask<'a> { scheduler: &'a EndpointScheduler, staging_store: Arc>, release_stores: Vec>, - database: Arc>, + database: Pool>, /// Channel where the dependencies arrive receiver: Receiver, @@ -638,7 +640,7 @@ impl<'a> JobTask<'a> { .collect::>(); let replacement_artifacts = crate::db::FindArtifacts::builder() - .database_connection(self.database.clone()) + .database_pool(self.database.clone()) .config(self.config) .package(self.jobdef.job.package()) .release_stores(&self.release_stores)