Skip to content

Commit

Permalink
Use a database connection pool instead of mutex locking
Browse files Browse the repository at this point in the history
This replaces the mutex-based PoC of 13bcbde via the `r2d2` feature of
the `diesel` crate. Using `unwrap()` should be fine as `Pool::get()`
only errors if the code runs into a timeout (in which case there's
probably a network issue or the database is down so we could only return
a proper error message instead of panicking).

The `min_idle` setting defaults to `None` which causes the `max_idle`
value to be used (defaults to 10). The `build()` function "will block
until the pool has established its configured minimum number of
connections". Therefore, it seems best to lower the value to 1 since
butido is a CLI and not a service/daemon (so the pool will open
additional connections on demand for the async code instead of
establishing all connections at the beginning - we probably only need
few database connections in most cases).

Signed-off-by: Michael Weiss <[email protected]>
  • Loading branch information
primeos-work committed May 22, 2023
1 parent 35ee678 commit 1f63fc9
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 76 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ config = { version = "0.11", default-features = false, features = [ "tom
csv = "1"
daggy = { version = "0.8", features = [ "serde" ] }
dialoguer = "0.10"
diesel = { version = "2", features = ["postgres", "chrono", "uuid", "serde_json"] }
diesel = { version = "2", features = ["postgres", "chrono", "uuid", "serde_json", "r2d2"] }
diesel_migrations = "2"
filters = "0.4"
futures = "0.3"
Expand Down
21 changes: 10 additions & 11 deletions src/commands/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::anyhow;
use anyhow::Context;
Expand All @@ -27,6 +26,8 @@ use diesel::ExpressionMethods;
use diesel::PgConnection;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use itertools::Itertools;
use tracing::{debug, info, trace, warn};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub async fn build(
repo_root: &Path,
matches: &ArgMatches,
progressbars: ProgressBars,
database_connection: PgConnection,
database_pool: Pool<ConnectionManager<PgConnection>>,
config: &Configuration,
repo: Repository,
repo_path: &Path,
Expand Down Expand Up @@ -297,19 +298,17 @@ pub async fn build(
.collect::<Result<Vec<()>>>()?;

trace!("Setting up database jobs for Package, GitHash, Image");
let database_connection = Arc::new(Mutex::new(database_connection));
// TODO: Avoid the locking here!:
let db_package = async { Package::create_or_fetch(&mut database_connection.clone().lock().unwrap(), package) };
let db_githash = async { GitHash::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &hash_str) };
let db_image = async { Image::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &image_name) };
let db_package = async { Package::create_or_fetch(&mut database_pool.get().unwrap(), package) };
let db_githash = async { GitHash::create_or_fetch(&mut database_pool.get().unwrap(), &hash_str) };
let db_image = async { Image::create_or_fetch(&mut database_pool.get().unwrap(), &image_name) };
let db_envs = async {
additional_env
.clone()
.into_iter()
.map(|(k, v)| async {
let k: EnvironmentVariableName = k; // hack to work around move semantics
let v: String = v; // hack to work around move semantics
EnvVar::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &k, &v)
EnvVar::create_or_fetch(&mut database_pool.get().unwrap(), &k, &v)
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Result<Vec<EnvVar>>>()
Expand All @@ -325,7 +324,7 @@ pub async fn build(
trace!("Database jobs for Package, GitHash, Image finished successfully");
trace!("Creating Submit in database");
let submit = Submit::create(
&mut database_connection.clone().lock().unwrap(),
&mut database_pool.get().unwrap(),
&now,
&submit_id,
&db_image,
Expand Down Expand Up @@ -366,7 +365,7 @@ pub async fn build(
.endpoint_config(endpoint_configurations)
.staging_store(staging_store)
.release_stores(release_stores)
.database(database_connection.clone())
.database(database_pool.clone())
.source_cache(source_cache)
.submit(submit)
.log_dir(if matches.get_flag("write-log-file") {
Expand Down Expand Up @@ -404,7 +403,7 @@ pub async fn build(
let data = schema::jobs::table
.filter(schema::jobs::dsl::uuid.eq(job_uuid))
.inner_join(schema::packages::table)
.first::<(Job, Package)>(&mut *database_connection.as_ref().lock().unwrap())?;
.first::<(Job, Package)>(&mut *database_pool.get().unwrap())?;

let number_log_lines = *config.build_error_lines();
writeln!(
Expand Down
8 changes: 4 additions & 4 deletions src/commands/find_artifact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
use std::path::PathBuf;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;
use std::convert::TryFrom;

use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use clap::ArgMatches;
use diesel::PgConnection;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use itertools::Itertools;
use tracing::{debug, trace};

Expand All @@ -34,7 +35,7 @@ use crate::util::progress::ProgressBars;
use crate::util::docker::ImageName;

/// Implementation of the "find_artifact" subcommand
pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progressbars: ProgressBars, repo: Repository, database_connection: PgConnection) -> Result<()> {
pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progressbars: ProgressBars, repo: Repository, database_pool: Pool<ConnectionManager<PgConnection>>) -> Result<()> {
let package_name_regex = crate::commands::util::mk_package_name_regex({
matches.get_one::<String>("package_name_regex").unwrap() // safe by clap
})?;
Expand Down Expand Up @@ -97,7 +98,6 @@ pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progres
None
};

let database = Arc::new(Mutex::new(database_connection));
repo.packages()
.filter(|p| package_name_regex.captures(p.name()).is_some())
.filter(|p| {
Expand All @@ -113,7 +113,7 @@ pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progres
.config(config)
.release_stores(&release_stores)
.staging_store(staging_store.as_ref())
.database_connection(database.clone())
.database_pool(database_pool.clone())
.env_filter(&env_filter)
.script_filter(script_filter)
.image_name(image_name.as_ref())
Expand Down
28 changes: 13 additions & 15 deletions src/commands/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
use std::path::Path;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::Error;
use anyhow::Result;
use diesel::PgConnection;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use walkdir::WalkDir;

use crate::config::Configuration;
Expand All @@ -29,7 +29,7 @@ pub async fn metrics(
repo_path: &Path,
config: &Configuration,
repo: Repository,
conn: PgConnection,
pool: Pool<ConnectionManager<PgConnection>>,
) -> Result<()> {
let mut out = std::io::stdout();

Expand All @@ -46,18 +46,16 @@ pub async fn metrics(
})
.count();

let conn = Arc::new(Mutex::new(conn));
// TODO: Avoid the locking here (makes async pointless)!:
let n_artifacts = async { crate::schema::artifacts::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_endpoints = async { crate::schema::endpoints::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_envvars = async { crate::schema::envvars::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_githashes = async { crate::schema::githashes::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_images = async { crate::schema::images::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_jobs = async { crate::schema::jobs::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_packages = async { crate::schema::packages::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_releasestores = async { crate::schema::release_stores::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_releases = async { crate::schema::releases::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_submits = async { crate::schema::submits::table.count().get_result::<i64>(&mut *conn.clone().lock().unwrap()) };
let n_artifacts = async { crate::schema::artifacts::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_endpoints = async { crate::schema::endpoints::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_envvars = async { crate::schema::envvars::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_githashes = async { crate::schema::githashes::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_images = async { crate::schema::images::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_jobs = async { crate::schema::jobs::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_packages = async { crate::schema::packages::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_releasestores = async { crate::schema::release_stores::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_releases = async { crate::schema::releases::table.count().get_result::<i64>(&mut pool.get().unwrap()) };
let n_submits = async { crate::schema::submits::table.count().get_result::<i64>(&mut pool.get().unwrap()) };

let (
n_artifacts,
Expand Down
20 changes: 8 additions & 12 deletions src/commands/release.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;

use anyhow::anyhow;
use anyhow::Context;
Expand Down Expand Up @@ -64,7 +62,7 @@ async fn new_release(

debug!("Release called for: {:?} {:?}", pname, pvers);

let mut conn = db_connection_config.establish_connection()?;
let pool = db_connection_config.establish_pool()?;
let submit_uuid = matches
.get_one::<String>("submit_uuid")
.map(|s| uuid::Uuid::parse_str(s.as_ref()))
Expand All @@ -74,7 +72,7 @@ async fn new_release(

let submit = crate::schema::submits::dsl::submits
.filter(crate::schema::submits::dsl::uuid.eq(submit_uuid))
.first::<dbmodels::Submit>(&mut conn)?;
.first::<dbmodels::Submit>(&mut pool.get().unwrap())?;
debug!("Found Submit: {:?}", submit_uuid);

let arts = {
Expand All @@ -93,30 +91,30 @@ async fn new_release(
"Query: {:?}",
diesel::debug_query::<diesel::pg::Pg, _>(&query)
);
query.load::<dbmodels::Artifact>(&mut conn)?
query.load::<dbmodels::Artifact>(&mut pool.get().unwrap())?
}
(Some(name), None) => {
let query = sel.filter(crate::schema::packages::name.eq(name));
debug!(
"Query: {:?}",
diesel::debug_query::<diesel::pg::Pg, _>(&query)
);
query.load::<dbmodels::Artifact>(&mut conn)?
query.load::<dbmodels::Artifact>(&mut pool.get().unwrap())?
}
(None, Some(vers)) => {
let query = sel.filter(crate::schema::packages::version.like(vers));
debug!(
"Query: {:?}",
diesel::debug_query::<diesel::pg::Pg, _>(&query)
);
query.load::<dbmodels::Artifact>(&mut conn)?
query.load::<dbmodels::Artifact>(&mut pool.get().unwrap())?
}
(None, None) => {
debug!(
"Query: {:?}",
diesel::debug_query::<diesel::pg::Pg, _>(&sel)
);
sel.load::<dbmodels::Artifact>(&mut conn)?
sel.load::<dbmodels::Artifact>(&mut pool.get().unwrap())?
}
}
};
Expand All @@ -138,13 +136,11 @@ async fn new_release(

let staging_base: &PathBuf = &config.staging_directory().join(submit.uuid.to_string());

let release_store = crate::db::models::ReleaseStore::create(&mut conn, release_store_name)?;
let release_store = crate::db::models::ReleaseStore::create(&mut pool.get().unwrap(), release_store_name)?;
let do_update = matches.get_flag("package_do_update");
let interactive = !matches.get_flag("noninteractive");

let now = chrono::offset::Local::now().naive_local();
// TODO: Find a proper solution to resolve: `error: captured variable cannot escape `FnMut` closure body`:
let conn = Arc::new(Mutex::new(conn));
let any_err = arts.into_iter()
.map(|art| async {
let art = art; // ensure it is moved
Expand Down Expand Up @@ -186,7 +182,7 @@ async fn new_release(
.map_err(Error::from)
.and_then(|_| {
debug!("Updating {:?} to set released = true", art);
let rel = crate::db::models::Release::create(&mut conn.clone().lock().unwrap(), &art, &now, &release_store)?;
let rel = crate::db::models::Release::create(&mut pool.get().unwrap(), &art, &now, &release_store)?;
debug!("Release object = {:?}", rel);
Ok(dest_path)
})
Expand Down
24 changes: 19 additions & 5 deletions src/db/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use anyhow::Result;
use clap::ArgMatches;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use getset::Getters;
use tracing::debug;

Expand Down Expand Up @@ -76,18 +78,30 @@ impl<'a> DbConnectionConfig<'a> {
})
}

pub fn establish_connection(self) -> Result<PgConnection> {
debug!("Trying to connect to database: {:?}", self);
let database_uri: String = format!(
fn get_database_uri(self) -> String {
format!(
"postgres://{user}:{password}@{host}:{port}/{name}?connect_timeout={timeout}",
host = self.database_host,
port = self.database_port,
user = self.database_user,
password = self.database_password,
name = self.database_name,
timeout = self.database_connection_timeout,
);
PgConnection::establish(&database_uri).map_err(Error::from)
)
}

pub fn establish_connection(self) -> Result<PgConnection> {
debug!("Trying to connect to database: {:?}", self);
PgConnection::establish(&self.get_database_uri()).map_err(Error::from)
}

pub fn establish_pool(self) -> Result<Pool<ConnectionManager<PgConnection>>> {
debug!("Trying to create a connection pool for database: {:?}", self);
let manager = ConnectionManager::<PgConnection>::new(self.get_database_uri());
Pool::builder()
.min_idle(Some(1))
.build(manager)
.map_err(Error::from)
}

}
Expand Down
Loading

0 comments on commit 1f63fc9

Please sign in to comment.