From 13bcbdedb1b31ff2221328d3c39058315176cf4a Mon Sep 17 00:00:00 2001 From: Michael Weiss Date: Wed, 4 Jan 2023 19:15:15 +0100 Subject: [PATCH] Update diesel, diesel_migrations, and uuid This implements the major upgrade of the Diesel crate from version 1 to version 2. The relevant changes are the following: - Diesel now requires mutable access to the `Connection` to perform any database interaction [0]. - The Diesel derive attributes need to be wrapped by `diesel()` now [1]. - The diesel_migration crate has been completely rewritten and the `embed_migrations!()` macro had to be replaced [2]. - The `connection::Connection::transaction` closure now takes one argument (the database connection). See [3] vs. [4]. - I also decided to rename `database_connection` to `conn` in these cases to better differentiate the two (we could keep/shadow the outer variable for a smaller diff but it seems cleaner that way). Note: The `Arc -> Arc>` change is only intended as a PoC. It is not a good idea as the mutex locking makes some of the async blocks/code completely pointless. This problem already existed before though. The documentation sates that the requirement of mutable connections is expected to be a "straightforward change as the connection already can execute only one query at a time". So this change should only make matters worse by moving the locking up and therefore increasing the time for which locks are held. This should only make the execution a bit slower (not sure if it would even be noticeable) but it increases the risk of deadlocks which is the real danger. To avoid this issue we should use a connection pool instead (e.g., via r2d2) but there are alternatives like trying to move the locking further down in the code again or rewriting the database logic. [0]: https://diesel.rs/guides/migration_guide.html#2-0-0-mutable-connection [1]: https://diesel.rs/guides/migration_guide.html#2-0-0-derive-attributes [2]: https://diesel.rs/guides/migration_guide.html#2-0-0-upgrade-migrations [3]: https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html#method.transaction [4]: https://docs.diesel.rs/1.4.x/diesel/connection/trait.Connection.html#method.transaction --- Cargo.lock | 93 ++++++++++++++------------------ Cargo.toml | 6 +-- src/commands/build.rs | 16 +++--- src/commands/db.rs | 73 +++++++++++++------------ src/commands/find_artifact.rs | 3 +- src/commands/metrics.rs | 24 +++++---- src/commands/release.rs | 26 +++++---- src/db/find_artifacts.rs | 9 ++-- src/db/models/artifact.rs | 16 +++--- src/db/models/endpoint.rs | 14 ++--- src/db/models/envvar.rs | 12 ++--- src/db/models/githash.rs | 12 ++--- src/db/models/image.rs | 14 ++--- src/db/models/job.rs | 22 ++++---- src/db/models/job_env.rs | 10 ++-- src/db/models/package.rs | 14 ++--- src/db/models/release_store.rs | 12 ++--- src/db/models/releases.rs | 14 ++--- src/db/models/submit.rs | 18 +++---- src/endpoint/scheduler.rs | 23 ++++---- src/main.rs | 2 - src/orchestrator/orchestrator.rs | 9 ++-- 22 files changed, 224 insertions(+), 218 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ab2c2d7..5838055b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,7 +150,7 @@ checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" dependencies = [ "addr2line", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", "miniz_oxide 0.6.2", "object", @@ -258,7 +258,7 @@ dependencies = [ "typed-builder", "unindent", "url", - "uuid 0.6.5", + "uuid", "vergen", "walkdir", "which", @@ -298,12 +298,6 @@ dependencies = [ "jobserver", ] -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -451,7 +445,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -460,7 +454,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -470,7 +464,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] @@ -482,7 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" dependencies = [ "autocfg", - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", "memoffset", "scopeguard", @@ -494,7 +488,7 @@ version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -596,25 +590,27 @@ dependencies = [ [[package]] name = "diesel" -version = "1.4.8" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b28135ecf6b7d446b43e27e225622a038cc4e2930a1022f51cdb97ada19b8e4d" +checksum = "72eb77396836a4505da85bae0712fa324b74acfe1876d7c2f7e694ef3d0ee373" dependencies = [ "bitflags", "byteorder", "chrono", "diesel_derives", + "itoa", "pq-sys", "serde_json", - "uuid 0.6.5", + "uuid", ] [[package]] name = "diesel_derives" -version = "1.4.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f5098f628d02a7a0f68ddba586fb61e80edec3bdc1be3b921f4ceec60858d3" +checksum = "0ad74fdcf086be3d4fdd142f67937678fe60ed431c3b2f08599e7687269410c4" dependencies = [ + "proc-macro-error", "proc-macro2", "quote", "syn 1.0.109", @@ -622,10 +618,11 @@ dependencies = [ [[package]] name = "diesel_migrations" -version = "1.4.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf3cde8413353dc7f5d72fa8ce0b99a560a359d2c5ef1e5817ca731cd9008f4c" +checksum = "e9ae22beef5e9d6fab9225ddb073c1c6c1a7a6ded5019d5da11d1e5c5adc34e2" dependencies = [ + "diesel", "migrations_internals", "migrations_macros", ] @@ -678,7 +675,7 @@ version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -717,7 +714,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall 0.2.16", "windows-sys 0.48.0", @@ -908,7 +905,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "wasi 0.11.0+wasi-snapshot-preview1", ] @@ -1077,7 +1074,7 @@ dependencies = [ "serde 1.0.160", "serde_derive", "toml 0.7.3", - "uuid 1.3.2", + "uuid", ] [[package]] @@ -1241,7 +1238,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1326,7 +1323,7 @@ checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" dependencies = [ "arrayvec", "bitflags", - "cfg-if 1.0.0", + "cfg-if", "ryu", "static_assertions", ] @@ -1432,7 +1429,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1461,23 +1458,23 @@ dependencies = [ [[package]] name = "migrations_internals" -version = "1.4.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b4fc84e4af020b837029e017966f86a1c2d5e83e64b589963d5047525995860" +checksum = "c493c09323068c01e54c685f7da41a9ccf9219735c3766fbfd6099806ea08fbc" dependencies = [ - "diesel", + "serde 1.0.160", + "toml 0.5.11", ] [[package]] name = "migrations_macros" -version = "1.4.2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c" +checksum = "8a8ff27a350511de30cdabb77147501c36ef02e0451d957abea2f30caffb2b58" dependencies = [ "migrations_internals", "proc-macro2", "quote", - "syn 1.0.109", ] [[package]] @@ -1643,7 +1640,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" dependencies = [ "bitflags", - "cfg-if 1.0.0", + "cfg-if", "foreign-types", "libc", "once_cell", @@ -1722,7 +1719,7 @@ version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall 0.2.16", "smallvec", @@ -2321,7 +2318,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest", ] @@ -2332,7 +2329,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest", ] @@ -2510,7 +2507,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "fastrand", "redox_syscall 0.3.5", "rustix", @@ -2562,7 +2559,7 @@ version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "once_cell", ] @@ -2765,7 +2762,7 @@ version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2901,17 +2898,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" -[[package]] -name = "uuid" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363" -dependencies = [ - "cfg-if 0.1.10", - "rand", - "serde 1.0.160", -] - [[package]] name = "uuid" version = "1.3.2" @@ -2919,6 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2" dependencies = [ "getrandom", + "serde 1.0.160", ] [[package]] @@ -2988,7 +2975,7 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "wasm-bindgen-macro", ] @@ -3013,7 +3000,7 @@ version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", diff --git a/Cargo.toml b/Cargo.toml index 5b459921..c54ae4c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,8 @@ config = { version = "0.11", default-features = false, features = [ "tom csv = "1" daggy = { version = "0.8", features = [ "serde" ] } dialoguer = "0.10" -diesel = { version = "1", features = ["postgres", "chrono", "uuid", "serde_json"] } -diesel_migrations = "1" +diesel = { version = "2", features = ["postgres", "chrono", "uuid", "serde_json"] } +diesel_migrations = "2" filters = "0.4" futures = "0.3" getset = "0.1" @@ -74,7 +74,7 @@ tokio-stream = "0.1" typed-builder = "0.14" unindent = "0.2" url = { version = "2", features = ["serde"] } -uuid = { version = "0.6", features = ["serde", "v4"] } +uuid = { version = "1", features = ["serde", "v4"] } walkdir = "2" which = "4" xdg = "2" diff --git a/src/commands/build.rs b/src/commands/build.rs index 281ebfa2..b3e6d747 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -15,6 +15,7 @@ use std::path::Path; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -296,9 +297,11 @@ pub async fn build( .collect::>>()?; trace!("Setting up database jobs for Package, GitHash, Image"); - let db_package = async { Package::create_or_fetch(&database_connection, package) }; - let db_githash = async { GitHash::create_or_fetch(&database_connection, &hash_str) }; - let db_image = async { Image::create_or_fetch(&database_connection, &image_name) }; + let database_connection = Arc::new(Mutex::new(database_connection)); + // TODO: Avoid the locking here!: + let db_package = async { Package::create_or_fetch(&mut database_connection.clone().lock().unwrap(), package) }; + let db_githash = async { GitHash::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &hash_str) }; + let db_image = async { Image::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &image_name) }; let db_envs = async { additional_env .clone() @@ -306,7 +309,7 @@ pub async fn build( .map(|(k, v)| async { let k: EnvironmentVariableName = k; // hack to work around move semantics let v: String = v; // hack to work around move semantics - EnvVar::create_or_fetch(&database_connection, &k, &v) + EnvVar::create_or_fetch(&mut database_connection.clone().lock().unwrap(), &k, &v) }) .collect::>() .collect::>>() @@ -322,7 +325,7 @@ pub async fn build( trace!("Database jobs for Package, GitHash, Image finished successfully"); trace!("Creating Submit in database"); let submit = Submit::create( - &database_connection, + &mut database_connection.clone().lock().unwrap(), &now, &submit_id, &db_image, @@ -358,7 +361,6 @@ pub async fn build( trace!("Setting up job sets finished successfully"); trace!("Setting up Orchestrator"); - let database_connection = Arc::new(database_connection); let orch = OrchestratorSetup::builder() .progress_generator(progressbars) .endpoint_config(endpoint_configurations) @@ -402,7 +404,7 @@ pub async fn build( let data = schema::jobs::table .filter(schema::jobs::dsl::uuid.eq(job_uuid)) .inner_join(schema::packages::table) - .first::<(Job, Package)>(database_connection.as_ref())?; + .first::<(Job, Package)>(&mut *database_connection.as_ref().lock().unwrap())?; let number_log_lines = *config.build_error_lines(); writeln!( diff --git a/src/commands/db.rs b/src/commands/db.rs index 5f19b434..e5187146 100644 --- a/src/commands/db.rs +++ b/src/commands/db.rs @@ -27,6 +27,10 @@ use diesel::ExpressionMethods; use diesel::JoinOnDsl; use diesel::QueryDsl; use diesel::RunQueryDsl; +use diesel_migrations::embed_migrations; +use diesel_migrations::EmbeddedMigrations; +use diesel_migrations::HarnessWithOutput; +use diesel_migrations::MigrationHarness; use itertools::Itertools; use tracing::{debug, info, trace, warn}; @@ -38,7 +42,7 @@ use crate::log::JobResult; use crate::package::Script; use crate::schema; -diesel_migrations::embed_migrations!("migrations"); +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); /// Implementation of the "db" subcommand pub fn db( @@ -148,8 +152,11 @@ fn cli(db_connection_config: DbConnectionConfig<'_>, matches: &ArgMatches) -> Re } fn setup(conn_cfg: DbConnectionConfig<'_>) -> Result<()> { - let conn = conn_cfg.establish_connection()?; - embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).map_err(Error::from) + let mut conn = conn_cfg.establish_connection()?; + HarnessWithOutput::write_to_stdout(&mut conn) + .run_pending_migrations(MIGRATIONS) + .map(|_| ()) + .map_err(|e| anyhow!(e)) } /// Implementation of the "db artifacts" subcommand @@ -158,7 +165,7 @@ fn artifacts(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<( let csv = matches.get_flag("csv"); let hdrs = crate::commands::util::mk_header(vec!["Path", "Released", "Job"]); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let data = matches .get_one::("job_uuid") .map(|s| uuid::Uuid::parse_str(s.as_ref())) @@ -168,7 +175,7 @@ fn artifacts(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<( .inner_join(schema::jobs::table) .left_join(schema::releases::table) .filter(schema::jobs::dsl::uuid.eq(job_uuid)) - .load::<(models::Artifact, models::Job, Option)>(&conn) + .load::<(models::Artifact, models::Job, Option)>(&mut conn) .map_err(Error::from) }) .unwrap_or_else(|| { @@ -176,7 +183,7 @@ fn artifacts(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<( .inner_join(schema::jobs::table) .left_join(schema::releases::table) .order_by(schema::artifacts::id.asc()) - .load::<(models::Artifact, models::Job, Option)>(&conn) + .load::<(models::Artifact, models::Job, Option)>(&mut conn) .map_err(Error::from) })? .into_iter() @@ -207,9 +214,9 @@ fn envvars(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> let csv = matches.get_flag("csv"); let hdrs = crate::commands::util::mk_header(vec!["Name", "Value"]); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let data = dsl::envvars - .load::(&conn)? + .load::(&mut conn)? .into_iter() .map(|evar| vec![evar.name, evar.value]) .collect::>(); @@ -229,9 +236,9 @@ fn images(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> let csv = matches.get_flag("csv"); let hdrs = crate::commands::util::mk_header(vec!["Name"]); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let data = dsl::images - .load::(&conn)? + .load::(&mut conn)? .into_iter() .map(|image| vec![image.name]) .collect::>(); @@ -247,24 +254,24 @@ fn images(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> /// Implementation of the "db submit" subcommand fn submit(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> { - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let submit_id = matches.get_one::("submit") .map(|s| uuid::Uuid::from_str(s.as_ref())) .transpose() .context("Parsing submit UUID")? .unwrap(); // safe by clap - let submit = models::Submit::with_id(&conn, &submit_id) + let submit = models::Submit::with_id(&mut conn, &submit_id) .with_context(|| anyhow!("Loading submit '{}' from DB", submit_id))?; - let githash = models::GitHash::with_id(&conn, submit.repo_hash_id) + let githash = models::GitHash::with_id(&mut conn, submit.repo_hash_id) .with_context(|| anyhow!("Loading GitHash '{}' from DB", submit.repo_hash_id))?; let jobs = schema::submits::table .inner_join(schema::jobs::table) .filter(schema::submits::uuid.eq(&submit_id)) .select(schema::jobs::all_columns) - .load::(&conn) + .load::(&mut conn) .with_context(|| anyhow!("Loading jobs for submit = {}", submit_id))?; let n_jobs = jobs.len(); @@ -309,11 +316,11 @@ fn submit(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> let header = crate::commands::util::mk_header(["Job", "Success", "Package", "Version", "Container", "Endpoint", "Image"].to_vec()); let data = jobs.iter() .map(|job| { - let image = models::Image::fetch_for_job(&conn, job)? + let image = models::Image::fetch_for_job(&mut conn, job)? .ok_or_else(|| anyhow!("Image for job {} not found", job.uuid))?; - let package = models::Package::fetch_for_job(&conn, job)? + let package = models::Package::fetch_for_job(&mut conn, job)? .ok_or_else(|| anyhow!("Package for job {} not found", job.uuid))?; - let endpoint = models::Endpoint::fetch_for_job(&conn, job)? + let endpoint = models::Endpoint::fetch_for_job(&mut conn, job)? .ok_or_else(|| anyhow!("Endpoint for job {} not found", job.uuid))?; Ok(vec![ @@ -339,7 +346,7 @@ fn submits(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> let csv = matches.get_flag("csv"); let limit = matches.get_one::("limit").map(|s| s.parse::()).transpose()?; let hdrs = crate::commands::util::mk_header(vec!["Time", "UUID", "For Package", "For Package Version"]); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let query = schema::submits::table .order_by(schema::submits::id.desc()) // required for the --limit implementation @@ -380,7 +387,7 @@ fn submits(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> }; // Only load the IDs of the submits, so we can later use them to filter the submits - let submit_ids = query.select(schema::submits::id).load::(&conn)?; + let submit_ids = query.select(schema::submits::id).load::(&mut conn)?; schema::submits::table .order_by(schema::submits::id.desc()) // required for the --limit implementation @@ -389,7 +396,7 @@ fn submits(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> }) .filter(schema::submits::id.eq_any(submit_ids)) .select((schema::submits::all_columns, schema::packages::all_columns)) - .load::<(models::Submit, models::Package)>(&conn)? + .load::<(models::Submit, models::Package)>(&mut conn)? } else if let Some(pkgname) = matches.get_one::("for_pkg") { // Get all submits _for_ the package let query = query @@ -404,7 +411,7 @@ fn submits(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> query } .select((schema::submits::all_columns, schema::packages::all_columns)) - .load::<(models::Submit, models::Package)>(&conn)? + .load::<(models::Submit, models::Package)>(&mut conn)? } else if let Some(limit) = limit { query .inner_join({ @@ -412,13 +419,13 @@ fn submits(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> }) .select((schema::submits::all_columns, schema::packages::all_columns)) .limit(limit) - .load::<(models::Submit, models::Package)>(&conn)? + .load::<(models::Submit, models::Package)>(&mut conn)? } else { query.inner_join({ schema::packages::table.on(schema::submits::requested_package_id.eq(schema::packages::id)) }) .select((schema::submits::all_columns, schema::packages::all_columns)) - .load::<(models::Submit, models::Package)>(&conn)? + .load::<(models::Submit, models::Package)>(&mut conn)? }; // Helper to map (Submit, Package) -> Vec @@ -455,7 +462,7 @@ fn jobs(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgM "Version", "Distro", ]); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let older_than_filter = get_date_filter("older_than", matches)?; let newer_than_filter = get_date_filter("newer_than", matches)?; @@ -484,7 +491,7 @@ fn jobs(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgM }) .inner_join(schema::job_envs::table) .select(schema::job_envs::job_id) - .load::(&conn)?; + .load::(&mut conn)?; debug!("Filtering for these IDs (because of env filter): {:?}", jids); sel = sel.filter(schema::jobs::dsl::id.eq_any(jids)); @@ -517,7 +524,7 @@ fn jobs(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgM let data = sel .order_by(schema::jobs::id.desc()) // required for the --limit implementation - .load::<(models::Job, models::Submit, models::Endpoint, models::Package, models::Image)>(&conn)? + .load::<(models::Job, models::Submit, models::Endpoint, models::Package, models::Image)>(&mut conn)? .into_iter() .rev() // required for the --limit implementation .map(|(job, submit, ep, package, image)| { @@ -557,7 +564,7 @@ fn job(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgMa let show_log = matches.get_flag("show_log"); let show_script = matches.get_flag("show_script"); let csv = matches.get_flag("csv"); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let job_uuid = matches .get_one::("job_uuid") .map(|s| uuid::Uuid::parse_str(s.as_ref())) @@ -576,7 +583,7 @@ fn job(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgMa models::Endpoint, models::Package, models::Image, - )>(&conn)?; + )>(&mut conn)?; trace!("Parsing log"); let parsed_log = crate::log::ParsedLog::from_str(&data.0.log_text)?; @@ -614,7 +621,7 @@ fn job(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgMa Some({ models::JobEnv::belonging_to(&data.0) .inner_join(schema::envvars::table) - .load::<(models::JobEnv, models::EnvVar)>(&conn)? + .load::<(models::JobEnv, models::EnvVar)>(&mut conn)? .into_iter() .map(|tpl| tpl.1) .enumerate() @@ -725,7 +732,7 @@ fn job(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgMa /// Implementation of the subcommand "db log-of" fn log_of(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> { - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let job_uuid = matches .get_one::("job_uuid") .map(|s| uuid::Uuid::parse_str(s.as_ref())) @@ -737,7 +744,7 @@ fn log_of(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> schema::jobs::table .filter(schema::jobs::dsl::uuid.eq(job_uuid)) .select(schema::jobs::dsl::log_text) - .first::(&conn) + .first::(&mut conn) .map_err(Error::from) .and_then(|s| crate::log::ParsedLog::from_str(&s))? .into_iter() @@ -749,7 +756,7 @@ fn log_of(conn_cfg: DbConnectionConfig<'_>, matches: &ArgMatches) -> Result<()> /// Implementation of the "db releases" subcommand fn releases(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: &ArgMatches) -> Result<()> { let csv = matches.get_flag("csv"); - let conn = conn_cfg.establish_connection()?; + let mut conn = conn_cfg.establish_connection()?; let header = crate::commands::util::mk_header(["Package", "Version", "Date", "Path"].to_vec()); let mut query = schema::jobs::table .inner_join(schema::packages::table) @@ -787,7 +794,7 @@ fn releases(conn_cfg: DbConnectionConfig<'_>, config: &Configuration, matches: & let rst = schema::release_stores::all_columns; (art, pac, rel, rst) }) - .load::<(models::Artifact, models::Package, models::Release, models::ReleaseStore)>(&conn)? + .load::<(models::Artifact, models::Package, models::Release, models::ReleaseStore)>(&mut conn)? .into_iter() .filter_map(|(art, pack, rel, rstore)| { let p = config.releases_directory().join(rstore.store_name).join(art.path); diff --git a/src/commands/find_artifact.rs b/src/commands/find_artifact.rs index 14a53c25..d34a0f36 100644 --- a/src/commands/find_artifact.rs +++ b/src/commands/find_artifact.rs @@ -13,6 +13,7 @@ use std::path::PathBuf; use std::io::Write; use std::sync::Arc; +use std::sync::Mutex; use std::convert::TryFrom; use anyhow::Context; @@ -96,7 +97,7 @@ pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progres None }; - let database = Arc::new(database_connection); + let database = Arc::new(Mutex::new(database_connection)); repo.packages() .filter(|p| package_name_regex.captures(p.name()).is_some()) .filter(|p| { diff --git a/src/commands/metrics.rs b/src/commands/metrics.rs index 88fffaa4..4992561c 100644 --- a/src/commands/metrics.rs +++ b/src/commands/metrics.rs @@ -12,6 +12,8 @@ use std::path::Path; use std::io::Write; +use std::sync::Arc; +use std::sync::Mutex; use anyhow::Error; use anyhow::Result; @@ -44,16 +46,18 @@ pub async fn metrics( }) .count(); - let n_artifacts = async { crate::schema::artifacts::table.count().get_result::(&conn) }; - let n_endpoints = async { crate::schema::endpoints::table.count().get_result::(&conn) }; - let n_envvars = async { crate::schema::envvars::table.count().get_result::(&conn) }; - let n_githashes = async { crate::schema::githashes::table.count().get_result::(&conn) }; - let n_images = async { crate::schema::images::table.count().get_result::(&conn) }; - let n_jobs = async { crate::schema::jobs::table.count().get_result::(&conn) }; - let n_packages = async { crate::schema::packages::table.count().get_result::(&conn) }; - let n_releasestores = async { crate::schema::release_stores::table.count().get_result::(&conn) }; - let n_releases = async { crate::schema::releases::table.count().get_result::(&conn) }; - let n_submits = async { crate::schema::submits::table.count().get_result::(&conn) }; + let conn = Arc::new(Mutex::new(conn)); + // TODO: Avoid the locking here (makes async pointless)!: + let n_artifacts = async { crate::schema::artifacts::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_endpoints = async { crate::schema::endpoints::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_envvars = async { crate::schema::envvars::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_githashes = async { crate::schema::githashes::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_images = async { crate::schema::images::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_jobs = async { crate::schema::jobs::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_packages = async { crate::schema::packages::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_releasestores = async { crate::schema::release_stores::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_releases = async { crate::schema::releases::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; + let n_submits = async { crate::schema::submits::table.count().get_result::(&mut *conn.clone().lock().unwrap()) }; let ( n_artifacts, diff --git a/src/commands/release.rs b/src/commands/release.rs index 0c3b6d66..fcee051e 100644 --- a/src/commands/release.rs +++ b/src/commands/release.rs @@ -12,6 +12,8 @@ use std::io::Write; use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -62,7 +64,7 @@ async fn new_release( debug!("Release called for: {:?} {:?}", pname, pvers); - let conn = db_connection_config.establish_connection()?; + let mut conn = db_connection_config.establish_connection()?; let submit_uuid = matches .get_one::("submit_uuid") .map(|s| uuid::Uuid::parse_str(s.as_ref())) @@ -72,7 +74,7 @@ async fn new_release( let submit = crate::schema::submits::dsl::submits .filter(crate::schema::submits::dsl::uuid.eq(submit_uuid)) - .first::(&conn)?; + .first::(&mut conn)?; debug!("Found Submit: {:?}", submit_uuid); let arts = { @@ -91,7 +93,7 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&conn)? + query.load::(&mut conn)? } (Some(name), None) => { let query = sel.filter(crate::schema::packages::name.eq(name)); @@ -99,7 +101,7 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&conn)? + query.load::(&mut conn)? } (None, Some(vers)) => { let query = sel.filter(crate::schema::packages::version.like(vers)); @@ -107,14 +109,14 @@ async fn new_release( "Query: {:?}", diesel::debug_query::(&query) ); - query.load::(&conn)? + query.load::(&mut conn)? } (None, None) => { debug!( "Query: {:?}", diesel::debug_query::(&sel) ); - sel.load::(&conn)? + sel.load::(&mut conn)? } } }; @@ -136,11 +138,13 @@ async fn new_release( let staging_base: &PathBuf = &config.staging_directory().join(submit.uuid.to_string()); - let release_store = crate::db::models::ReleaseStore::create(&conn, release_store_name)?; + let release_store = crate::db::models::ReleaseStore::create(&mut conn, release_store_name)?; let do_update = matches.get_flag("package_do_update"); let interactive = !matches.get_flag("noninteractive"); let now = chrono::offset::Local::now().naive_local(); + // TODO: Find a proper solution to resolve: `error: captured variable cannot escape `FnMut` closure body`: + let conn = Arc::new(Mutex::new(conn)); let any_err = arts.into_iter() .map(|art| async { let art = art; // ensure it is moved @@ -182,7 +186,7 @@ async fn new_release( .map_err(Error::from) .and_then(|_| { debug!("Updating {:?} to set released = true", art); - let rel = crate::db::models::Release::create(&conn, &art, &now, &release_store)?; + let rel = crate::db::models::Release::create(&mut conn.clone().lock().unwrap(), &art, &now, &release_store)?; debug!("Release object = {:?}", rel); Ok(dest_path) }) @@ -231,7 +235,7 @@ pub async fn rm_release( let pvers = matches.get_one::("package_version").unwrap(); // safe by clap debug!("Remove Release called for: {:?} {:?}", pname, pvers); - let conn = db_connection_config.establish_connection()?; + let mut conn = db_connection_config.establish_connection()?; let (release, artifact) = crate::schema::jobs::table .inner_join(crate::schema::packages::table) @@ -245,7 +249,7 @@ pub async fn rm_release( .filter(crate::schema::release_stores::dsl::store_name.eq(&release_store_name)) .order(crate::schema::releases::dsl::release_date.desc()) .select((crate::schema::releases::all_columns, crate::schema::artifacts::all_columns)) - .first::<(crate::db::models::Release, crate::db::models::Artifact)>(&conn)?; + .first::<(crate::db::models::Release, crate::db::models::Artifact)>(&mut conn)?; let artifact_path = config.releases_directory().join(release_store_name).join(&artifact.path); if !artifact_path.is_file() { @@ -261,7 +265,7 @@ pub async fn rm_release( tokio::fs::remove_file(&artifact_path).await?; info!("File removed"); - diesel::delete(&release).execute(&conn)?; + diesel::delete(&release).execute(&mut conn)?; info!("Release deleted from database"); Ok(()) diff --git a/src/db/find_artifacts.rs b/src/db/find_artifacts.rs index 284b82b1..dfd80f72 100644 --- a/src/db/find_artifacts.rs +++ b/src/db/find_artifacts.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use std::sync::Mutex; use anyhow::Result; use chrono::NaiveDateTime; @@ -52,7 +53,7 @@ use crate::util::docker::ImageName; #[derive(typed_builder::TypedBuilder)] pub struct FindArtifacts<'a> { config: &'a Configuration, - database_connection: Arc, + database_connection: Arc>, /// The release stores to search in release_stores: &'a [Arc], @@ -152,7 +153,7 @@ impl<'a> FindArtifacts<'a> { (arts, jobs) }) - .load::<(dbmodels::Artifact, dbmodels::Job)>(&*self.database_connection)? + .load::<(dbmodels::Artifact, dbmodels::Job)>(&mut *self.database_connection.as_ref().lock().unwrap())? .into_iter() .inspect(|(art, job)| debug!("Filtering further: {:?}, job {:?}", art, job.id)) // @@ -171,7 +172,7 @@ impl<'a> FindArtifacts<'a> { let job = tpl.1; let job_env: Vec<(String, String)> = job - .env(&self.database_connection)? + .env(&mut self.database_connection.as_ref().lock().unwrap())? .into_iter() .map(|var: dbmodels::EnvVar| (var.name, var.value)) .collect(); @@ -186,7 +187,7 @@ impl<'a> FindArtifacts<'a> { Ok((_, bl)) => *bl, }) .and_then_ok(|(art, _)| { - if let Some(release) = art.get_release(&self.database_connection)? { + if let Some(release) = art.get_release(&mut self.database_connection.as_ref().lock().unwrap())? { Ok((art, Some(release.release_date))) } else { Ok((art, None)) diff --git a/src/db/models/artifact.rs b/src/db/models/artifact.rs index 1781d272..9df96692 100644 --- a/src/db/models/artifact.rs +++ b/src/db/models/artifact.rs @@ -25,7 +25,7 @@ use crate::schema::artifacts; use crate::schema::artifacts::*; #[derive(Debug, Identifiable, Queryable, Associations)] -#[belongs_to(Job)] +#[diesel(belongs_to(Job))] pub struct Artifact { pub id: i32, pub path: String, @@ -33,7 +33,7 @@ pub struct Artifact { } #[derive(Insertable)] -#[table_name = "artifacts"] +#[diesel(table_name = artifacts)] struct NewArtifact<'a> { pub path: &'a str, pub job_id: i32, @@ -46,7 +46,7 @@ impl Artifact { pub fn released( self, - database_connection: &PgConnection, + database_connection: &mut PgConnection, release_date: &NaiveDateTime, release_store_name: &str, ) -> Result { @@ -54,7 +54,7 @@ impl Artifact { crate::db::models::Release::create(database_connection, &self, release_date, &rs) } - pub fn get_release(&self, database_connection: &PgConnection) -> Result> { + pub fn get_release(&self, database_connection: &mut PgConnection) -> Result> { use crate::schema; schema::artifacts::table @@ -67,7 +67,7 @@ impl Artifact { } pub fn create( - database_connection: &PgConnection, + database_connection: &mut PgConnection, art_path: &ArtifactPath, job: &Job, ) -> Result { @@ -80,14 +80,14 @@ impl Artifact { job_id: job.id, }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(artifacts::table) .values(&new_art) - .execute(database_connection)?; + .execute(conn)?; dsl::artifacts .filter(path.eq(path_str).and(job_id.eq(job.id))) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } diff --git a/src/db/models/endpoint.rs b/src/db/models/endpoint.rs index 0544308d..9da34be8 100644 --- a/src/db/models/endpoint.rs +++ b/src/db/models/endpoint.rs @@ -24,33 +24,33 @@ pub struct Endpoint { } #[derive(Insertable)] -#[table_name = "endpoints"] +#[diesel(table_name = endpoints)] struct NewEndpoint<'a> { pub name: &'a str, } impl Endpoint { - pub fn create_or_fetch(database_connection: &PgConnection, ep_name: &EndpointName) -> Result { + pub fn create_or_fetch(database_connection: &mut PgConnection, ep_name: &EndpointName) -> Result { let new_ep = NewEndpoint { name: ep_name.as_ref() }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(endpoints::table) .values(&new_ep) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; dsl::endpoints .filter(name.eq(ep_name.as_ref())) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } - pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { + pub fn fetch_for_job(database_connection: &mut PgConnection, j: &crate::db::models::Job) -> Result> { Self::fetch_by_id(database_connection, j.endpoint_id) } - pub fn fetch_by_id(database_connection: &PgConnection, eid: i32) -> Result> { + pub fn fetch_by_id(database_connection: &mut PgConnection, eid: i32) -> Result> { match dsl::endpoints.filter(id.eq(eid)).first::(database_connection) { Err(diesel::result::Error::NotFound) => Ok(None), Err(e) => Err(Error::from(e)), diff --git a/src/db/models/envvar.rs b/src/db/models/envvar.rs index 2d7e78d7..a276b193 100644 --- a/src/db/models/envvar.rs +++ b/src/db/models/envvar.rs @@ -18,7 +18,7 @@ use crate::schema::envvars::*; use crate::util::EnvironmentVariableName; #[derive(Debug, Identifiable, Queryable)] -#[table_name = "envvars"] +#[diesel(table_name = envvars)] pub struct EnvVar { pub id: i32, pub name: String, @@ -26,7 +26,7 @@ pub struct EnvVar { } #[derive(Insertable)] -#[table_name = "envvars"] +#[diesel(table_name = envvars)] struct NewEnvVar<'a> { pub name: &'a str, pub value: &'a str, @@ -34,7 +34,7 @@ struct NewEnvVar<'a> { impl EnvVar { pub fn create_or_fetch( - database_connection: &PgConnection, + database_connection: &mut PgConnection, k: &EnvironmentVariableName, v: &str, ) -> Result { @@ -43,15 +43,15 @@ impl EnvVar { value: v, }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(envvars::table) .values(&new_envvar) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; dsl::envvars .filter(name.eq(k.as_ref()).and(value.eq(v))) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } diff --git a/src/db/models/githash.rs b/src/db/models/githash.rs index 98c0a418..77b9dfd0 100644 --- a/src/db/models/githash.rs +++ b/src/db/models/githash.rs @@ -24,29 +24,29 @@ pub struct GitHash { } #[derive(Insertable)] -#[table_name = "githashes"] +#[diesel(table_name = githashes)] struct NewGitHash<'a> { pub hash: &'a str, } impl GitHash { - pub fn create_or_fetch(database_connection: &PgConnection, githash: &str) -> Result { + pub fn create_or_fetch(database_connection: &mut PgConnection, githash: &str) -> Result { let new_hash = NewGitHash { hash: githash }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(githashes::table) .values(&new_hash) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; dsl::githashes .filter(hash.eq(githash)) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } - pub fn with_id(database_connection: &PgConnection, git_hash_id: i32) -> Result { + pub fn with_id(database_connection: &mut PgConnection, git_hash_id: i32) -> Result { dsl::githashes .find(git_hash_id) .first::<_>(database_connection) diff --git a/src/db/models/image.rs b/src/db/models/image.rs index bcd4aa8f..35d4bce3 100644 --- a/src/db/models/image.rs +++ b/src/db/models/image.rs @@ -24,38 +24,38 @@ pub struct Image { } #[derive(Insertable)] -#[table_name = "images"] +#[diesel(table_name = images)] struct NewImage<'a> { pub name: &'a str, } impl Image { pub fn create_or_fetch( - database_connection: &PgConnection, + database_connection: &mut PgConnection, image_name: &ImageName, ) -> Result { let new_image = NewImage { name: image_name.as_ref(), }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(images::table) .values(&new_image) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; dsl::images .filter(name.eq(image_name.as_ref())) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } - pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { + pub fn fetch_for_job(database_connection: &mut PgConnection, j: &crate::db::models::Job) -> Result> { Self::fetch_by_id(database_connection, j.image_id) } - pub fn fetch_by_id(database_connection: &PgConnection, iid: i32) -> Result> { + pub fn fetch_by_id(database_connection: &mut PgConnection, iid: i32) -> Result> { match dsl::images.filter(id.eq(iid)).first::(database_connection) { Err(diesel::result::Error::NotFound) => Ok(None), Err(e) => Err(Error::from(e)), diff --git a/src/db/models/job.rs b/src/db/models/job.rs index 1977d1e6..1f7f15bf 100644 --- a/src/db/models/job.rs +++ b/src/db/models/job.rs @@ -22,11 +22,11 @@ use crate::schema::jobs::*; use crate::util::docker::ContainerHash; #[derive(Debug, Eq, PartialEq, Identifiable, Queryable, Associations)] -#[belongs_to(Submit)] -#[belongs_to(Endpoint)] -#[belongs_to(Package)] -#[belongs_to(Image)] -#[table_name = "jobs"] +#[diesel(belongs_to(Submit))] +#[diesel(belongs_to(Endpoint))] +#[diesel(belongs_to(Package))] +#[diesel(belongs_to(Image))] +#[diesel(table_name = jobs)] pub struct Job { pub id: i32, pub submit_id: i32, @@ -40,7 +40,7 @@ pub struct Job { } #[derive(Debug, Insertable)] -#[table_name = "jobs"] +#[diesel(table_name = jobs)] struct NewJob<'a> { pub submit_id: i32, pub endpoint_id: i32, @@ -55,7 +55,7 @@ struct NewJob<'a> { impl Job { #[allow(clippy::too_many_arguments)] pub fn create( - database_connection: &PgConnection, + database_connection: &mut PgConnection, job_uuid: &::uuid::Uuid, submit: &Submit, endpoint: &Endpoint, @@ -83,20 +83,20 @@ impl Job { trace!("Query = {}", diesel::debug_query::(&query)); - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { query - .execute(database_connection) + .execute(conn) .context("Creating job in database")?; dsl::jobs .filter(uuid.eq(job_uuid)) - .first::(database_connection) + .first::(conn) .with_context(|| format!("Finding created job in database: {job_uuid}")) .map_err(Error::from) }) } - pub fn env(&self, database_connection: &PgConnection) -> Result> { + pub fn env(&self, database_connection: &mut PgConnection) -> Result> { use crate::schema; schema::job_envs::table diff --git a/src/db/models/job_env.rs b/src/db/models/job_env.rs index 36493abe..eee7ea12 100644 --- a/src/db/models/job_env.rs +++ b/src/db/models/job_env.rs @@ -17,9 +17,9 @@ use crate::db::models::Job; use crate::schema::job_envs; #[derive(Identifiable, Queryable, Associations)] -#[belongs_to(Job)] -#[belongs_to(EnvVar, foreign_key = "env_id")] -#[table_name = "job_envs"] +#[diesel(belongs_to(Job))] +#[diesel(belongs_to(EnvVar, foreign_key = env_id))] +#[diesel(table_name = job_envs)] pub struct JobEnv { pub id: i32, pub job_id: i32, @@ -27,14 +27,14 @@ pub struct JobEnv { } #[derive(Insertable)] -#[table_name = "job_envs"] +#[diesel(table_name = job_envs)] struct NewJobEnv { pub job_id: i32, pub env_id: i32, } impl JobEnv { - pub fn create(database_connection: &PgConnection, job: &Job, env: &EnvVar) -> Result<()> { + pub fn create(database_connection: &mut PgConnection, job: &Job, env: &EnvVar) -> Result<()> { let new_jobenv = NewJobEnv { job_id: job.id, env_id: env.id, diff --git a/src/db/models/package.rs b/src/db/models/package.rs index b30dadd3..b029200d 100644 --- a/src/db/models/package.rs +++ b/src/db/models/package.rs @@ -26,7 +26,7 @@ pub struct Package { } #[derive(Insertable)] -#[table_name = "packages"] +#[diesel(table_name = packages)] struct NewPackage<'a> { pub name: &'a str, pub version: &'a str, @@ -34,7 +34,7 @@ struct NewPackage<'a> { impl Package { pub fn create_or_fetch( - database_connection: &PgConnection, + database_connection: &mut PgConnection, p: &crate::package::Package, ) -> Result { let new_package = NewPackage { @@ -42,11 +42,11 @@ impl Package { version: p.version().deref(), }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(packages::table) .values(&new_package) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; dsl::packages .filter({ @@ -55,16 +55,16 @@ impl Package { name.eq(p_name).and(version.eq(p_vers)) }) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } - pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { + pub fn fetch_for_job(database_connection: &mut PgConnection, j: &crate::db::models::Job) -> Result> { Self::fetch_by_id(database_connection, j.package_id) } - pub fn fetch_by_id(database_connection: &PgConnection, pid: i32) -> Result> { + pub fn fetch_by_id(database_connection: &mut PgConnection, pid: i32) -> Result> { match dsl::packages.filter(id.eq(pid)).first::(database_connection) { Err(diesel::result::Error::NotFound) => Ok(None), Err(e) => Err(Error::from(e)), diff --git a/src/db/models/release_store.rs b/src/db/models/release_store.rs index 89ee4928..3df33a2c 100644 --- a/src/db/models/release_store.rs +++ b/src/db/models/release_store.rs @@ -20,33 +20,33 @@ use crate::schema::release_stores; use crate::schema; #[derive(Debug, Identifiable, Queryable)] -#[table_name = "release_stores"] +#[diesel(table_name = release_stores)] pub struct ReleaseStore { pub id: i32, pub store_name: String, } #[derive(Insertable)] -#[table_name = "release_stores"] +#[diesel(table_name = release_stores)] struct NewReleaseStore<'a> { pub store_name : &'a str, } impl ReleaseStore { - pub fn create(database_connection: &PgConnection, name: &str) -> Result { + pub fn create(database_connection: &mut PgConnection, name: &str) -> Result { let new_relstore = NewReleaseStore { store_name: name, }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(schema::release_stores::table) .values(&new_relstore) .on_conflict_do_nothing() - .execute(database_connection)?; + .execute(conn)?; schema::release_stores::table .filter(schema::release_stores::store_name.eq(name)) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } diff --git a/src/db/models/releases.rs b/src/db/models/releases.rs index 306cbaf9..507ac232 100644 --- a/src/db/models/releases.rs +++ b/src/db/models/releases.rs @@ -20,8 +20,8 @@ use crate::schema::releases; use crate::schema::releases::*; #[derive(Debug, Identifiable, Queryable, Associations)] -#[belongs_to(Artifact)] -#[belongs_to(ReleaseStore)] +#[diesel(belongs_to(Artifact))] +#[diesel(belongs_to(ReleaseStore))] pub struct Release { pub id: i32, pub artifact_id: i32, @@ -30,7 +30,7 @@ pub struct Release { } #[derive(Insertable)] -#[table_name = "releases"] +#[diesel(table_name = releases)] struct NewRelease<'a> { pub artifact_id: i32, pub release_date: &'a NaiveDateTime, @@ -39,7 +39,7 @@ struct NewRelease<'a> { impl Release { pub fn create<'a>( - database_connection: &PgConnection, + database_connection: &mut PgConnection, art: &Artifact, date: &'a NaiveDateTime, store: &'a ReleaseStore, @@ -50,14 +50,14 @@ impl Release { release_store_id: store.id, }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(releases::table) .values(&new_rel) - .execute(database_connection)?; + .execute(conn)?; dsl::releases .filter(artifact_id.eq(art.id).and(release_date.eq(date))) - .first::(database_connection) + .first::(conn) .map_err(Error::from) }) } diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs index 5c92011e..4d5aff63 100644 --- a/src/db/models/submit.rs +++ b/src/db/models/submit.rs @@ -22,9 +22,9 @@ use crate::schema::submits; use crate::schema::submits::*; #[derive(Clone, Debug, Eq, PartialEq, Identifiable, Queryable, Associations)] -#[belongs_to(Package, foreign_key = "requested_package_id")] -#[belongs_to(Image, foreign_key = "requested_image_id")] -#[table_name = "submits"] +#[diesel(belongs_to(Package, foreign_key = requested_package_id))] +#[diesel(belongs_to(Image, foreign_key = requested_image_id))] +#[diesel(table_name = submits)] pub struct Submit { pub id: i32, pub uuid: ::uuid::Uuid, @@ -35,7 +35,7 @@ pub struct Submit { } #[derive(Insertable)] -#[table_name = "submits"] +#[diesel(table_name = submits)] struct NewSubmit<'a> { pub uuid: &'a ::uuid::Uuid, pub submit_time: &'a NaiveDateTime, @@ -46,7 +46,7 @@ struct NewSubmit<'a> { impl Submit { pub fn create( - database_connection: &PgConnection, + database_connection: &mut PgConnection, submit_datetime: &NaiveDateTime, submit_id: &::uuid::Uuid, requested_image: &Image, @@ -61,21 +61,21 @@ impl Submit { repo_hash_id: repo_hash.id, }; - database_connection.transaction::<_, Error, _>(|| { + database_connection.transaction::<_, Error, _>(|conn| { diesel::insert_into(submits::table) .values(&new_submit) // required because if we re-use the staging store, we do not create a new UUID but re-use the old one .on_conflict_do_nothing() - .execute(database_connection) + .execute(conn) .context("Inserting new submit into submits table")?; - Self::with_id(database_connection, submit_id) + Self::with_id(conn, submit_id) }) } - pub fn with_id(database_connection: &PgConnection, submit_id: &::uuid::Uuid) -> Result { + pub fn with_id(database_connection: &mut PgConnection, submit_id: &::uuid::Uuid) -> Result { dsl::submits .filter(submits::uuid.eq(submit_id)) .first::(database_connection) diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 4e0ecd3c..6c7af3bf 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -10,6 +10,7 @@ use std::path::PathBuf; use std::sync::Arc; +use std::sync::Mutex; use anyhow::anyhow; use anyhow::Context; @@ -42,7 +43,7 @@ pub struct EndpointScheduler { staging_store: Arc>, release_stores: Vec>, - db: Arc, + db: Arc>, submit: crate::db::models::Submit, } @@ -51,7 +52,7 @@ impl EndpointScheduler { endpoints: Vec, staging_store: Arc>, release_stores: Vec>, - db: Arc, + db: Arc>, submit: crate::db::models::Submit, log_dir: Option, ) -> Result { @@ -117,7 +118,7 @@ pub struct JobHandle { endpoint: EndpointHandle, job: RunnableJob, bar: ProgressBar, - db: Arc, + db: Arc>, staging_store: Arc>, release_stores: Vec>, submit: crate::db::models::Submit, @@ -134,9 +135,9 @@ impl JobHandle { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::(); let endpoint_uri = self.endpoint.uri().clone(); let endpoint_name = self.endpoint.name().clone(); - let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, self.endpoint.name())?; - let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; - let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; + let endpoint = dbmodels::Endpoint::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.endpoint.name())?; + let package = dbmodels::Package::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.job.package())?; + let image = dbmodels::Image::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), self.job.image())?; let envs = self.create_env_in_db()?; let job_id = *self.job.uuid(); trace!("Running on Job {} on Endpoint {}", job_id, self.endpoint.name()); @@ -186,7 +187,7 @@ impl JobHandle { })?; let job = dbmodels::Job::create( - &self.db, + &mut self.db.as_ref().lock().unwrap(), &job_id, &self.submit, &endpoint, @@ -200,7 +201,7 @@ impl JobHandle { trace!("DB: Job entry for job {} created: {}", job.uuid, job.id); for env in envs { - dbmodels::JobEnv::create(&self.db, &job, &env) + dbmodels::JobEnv::create(&mut self.db.as_ref().lock().unwrap(), &job, &env) .with_context(|| format!("Creating Environment Variable mapping for Job: {}", job.uuid))?; } @@ -245,7 +246,7 @@ impl JobHandle { let staging_read = self.staging_store.read().await; for p in paths.iter() { trace!("DB: Creating artifact entry for path: {}", p.display()); - let _ = dbmodels::Artifact::create(&self.db, p, &job)?; + let _ = dbmodels::Artifact::create(&mut self.db.as_ref().lock().unwrap(), p, &job)?; r.push({ staging_read .get(p) @@ -293,7 +294,7 @@ impl JobHandle { .inspect(|(k, v)| { trace!("Creating environment variable in database: {} = {}", k, v) }) - .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&self.db, k, v)) + .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), k, v)) .collect::>>() }) .transpose()? @@ -308,7 +309,7 @@ impl JobHandle { .inspect(|(k, v)| { trace!("Creating environment variable in database: {} = {}", k, v) }) - .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&self.db, k, v)) + .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&mut self.db.as_ref().lock().unwrap(), k, v)) }) .collect() } diff --git a/src/main.rs b/src/main.rs index 343934e5..a18178f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,8 +47,6 @@ extern crate log as logcrate; #[macro_use] extern crate diesel; -#[macro_use] -extern crate diesel_migrations; use std::path::PathBuf; diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index c5c4b071..2aa9e13f 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -14,6 +14,7 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use std::sync::Mutex; use anyhow::Error; use anyhow::Context; @@ -163,7 +164,7 @@ pub struct Orchestrator<'a> { jobdag: Dag, config: &'a Configuration, repository: Repository, - database: Arc, + database: Arc>, } #[derive(TypedBuilder)] @@ -174,7 +175,7 @@ pub struct OrchestratorSetup<'a> { release_stores: Vec>, source_cache: SourceCache, jobdag: Dag, - database: Arc, + database: Arc>, submit: dbmodels::Submit, log_dir: Option, config: &'a Configuration, @@ -454,7 +455,7 @@ struct TaskPreparation<'a> { scheduler: &'a EndpointScheduler, staging_store: Arc>, release_stores: Vec>, - database: Arc, + database: Arc>, } /// Helper type for executing one job task @@ -472,7 +473,7 @@ struct JobTask<'a> { scheduler: &'a EndpointScheduler, staging_store: Arc>, release_stores: Vec>, - database: Arc, + database: Arc>, /// Channel where the dependencies arrive receiver: Receiver,