Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Commit

Permalink
Pool builder
Browse files Browse the repository at this point in the history
Allows starting a pool builder from an uri (that parses some of the
settings), then changing them easily afterwards. Also some new settings
allowed for the user:

- `max_idle_lifetime`
- `test_on_check_out`
- `health_check_interval`
  • Loading branch information
Julius de Bruijn committed Mar 26, 2020
1 parent 83a309c commit 37bcb11
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 153 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ log = { version = "0.4", features = ["release_max_level_trace"] }
tracing = { version = "0.1", optional = true }
tracing-core = { version = "0.1", optional = true }

mobc = { git = "https://github.com/pimeys/mobc", branch = "health-check-timeout", optional = true }
mobc = { version = "0.5.6", optional = true }
bytes = { version = "0.5", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "macros", "sync"], optional = true}
serde = { version = "1.0", optional = true }
Expand Down
37 changes: 20 additions & 17 deletions src/connector/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod conversion;
mod error;

use mysql_async::{self as my, prelude::Queryable as _};
use mysql_async::{self as my, prelude::Queryable as _, Conn};
use percent_encoding::percent_decode;
use std::{borrow::Cow, future::Future, path::Path, time::Duration};
use tokio::time::timeout;
Expand All @@ -20,7 +20,7 @@ pub struct Mysql {
pub(crate) pool: my::Pool,
pub(crate) url: MysqlUrl,
socket_timeout: Option<Duration>,
connect_timeout: Duration,
connect_timeout: Option<Duration>,
}

/// Wraps a connection url and exposes the parsing logic used by quaint, including default values.
Expand Down Expand Up @@ -94,21 +94,17 @@ impl MysqlUrl {
self.url.port().unwrap_or(3306)
}

pub(crate) fn connect_timeout(&self) -> Duration {
pub(crate) fn connect_timeout(&self) -> Option<Duration> {
self.query_params.connect_timeout
}

fn default_connection_limit() -> usize {
num_cpus::get_physical() * 2 + 1
}

fn parse_query_params(url: &Url) -> Result<MysqlUrlQueryParams, Error> {
let mut connection_limit = Self::default_connection_limit();
let mut connection_limit = None;
let mut ssl_opts = my::SslOpts::default();
let mut use_ssl = false;
let mut socket = None;
let mut socket_timeout = None;
let mut connect_timeout = Duration::from_secs(5);
let mut connect_timeout = None;

for (k, v) in url.query_pairs() {
match k.as_ref() {
Expand All @@ -117,7 +113,7 @@ impl MysqlUrl {
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;

connection_limit = as_int;
connection_limit = Some(as_int);
}
"sslcert" => {
use_ssl = true;
Expand All @@ -144,7 +140,7 @@ impl MysqlUrl {
let as_int = v
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;
connect_timeout = Duration::from_secs(as_int);
connect_timeout = Some(Duration::from_secs(as_int));
}
"sslaccept" => {
match v.as_ref() {
Expand Down Expand Up @@ -183,7 +179,7 @@ impl MysqlUrl {
}

#[cfg(feature = "pooled")]
pub(crate) fn connection_limit(&self) -> usize {
pub(crate) fn connection_limit(&self) -> Option<usize> {
self.query_params.connection_limit
}

Expand Down Expand Up @@ -218,11 +214,11 @@ impl MysqlUrl {
#[derive(Debug, Clone)]
pub(crate) struct MysqlUrlQueryParams {
ssl_opts: my::SslOpts,
connection_limit: usize,
connection_limit: Option<usize>,
use_ssl: bool,
socket: Option<String>,
socket_timeout: Option<Duration>,
connect_timeout: Duration,
connect_timeout: Option<Duration>,
}

impl Mysql {
Expand Down Expand Up @@ -257,6 +253,13 @@ impl Mysql {
},
}
}

async fn get_conn(&self) -> crate::Result<Conn> {
match self.connect_timeout {
Some(duration) => Ok(timeout(duration, self.pool.get_conn()).await??),
None => Ok(self.pool.get_conn().await?),
}
}
}

impl TransactionCapable for Mysql {}
Expand All @@ -274,7 +277,7 @@ impl Queryable for Mysql {

fn query_raw<'a>(&'a self, sql: &'a str, params: &'a [ParameterizedValue]) -> DBIO<'a, ResultSet> {
metrics::query("mysql.query_raw", sql, params, move || async move {
let conn = timeout(self.connect_timeout, self.pool.get_conn()).await??;
let conn = self.get_conn().await?;
let results = self
.timeout(conn.prep_exec(sql, conversion::conv_params(params)))
.await?;
Expand Down Expand Up @@ -306,7 +309,7 @@ impl Queryable for Mysql {

fn execute_raw<'a>(&'a self, sql: &'a str, params: &'a [ParameterizedValue<'a>]) -> DBIO<'a, u64> {
metrics::query("mysql.execute_raw", sql, params, move || async move {
let conn = timeout(self.connect_timeout, self.pool.get_conn()).await??;
let conn = self.get_conn().await?;
let results = self
.timeout(conn.prep_exec(sql, conversion::conv_params(params)))
.await?;
Expand All @@ -316,7 +319,7 @@ impl Queryable for Mysql {

fn raw_cmd<'a>(&'a self, cmd: &'a str) -> DBIO<'a, ()> {
metrics::query("mysql.raw_cmd", cmd, &[], move || async move {
let conn = timeout(self.connect_timeout, self.pool.get_conn()).await??;
let conn = self.get_conn().await?;
self.timeout(conn.query(cmd)).await?;

Ok(())
Expand Down
25 changes: 12 additions & 13 deletions src/connector/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,12 @@ impl PostgresUrl {
&self.query_params.schema
}

pub(crate) fn connect_timeout(&self) -> Duration {
pub(crate) fn connect_timeout(&self) -> Option<Duration> {
self.query_params.connect_timeout
}

fn default_connection_limit() -> usize {
num_cpus::get_physical() * 2 + 1
}

fn parse_query_params(url: &Url) -> Result<PostgresUrlQueryParams, Error> {
let mut connection_limit = Self::default_connection_limit();
let mut connection_limit = None;
let mut schema = String::from(DEFAULT_SCHEMA);
let mut certificate_file = None;
let mut identity_file = None;
Expand All @@ -226,7 +222,7 @@ impl PostgresUrl {
let mut ssl_mode = SslMode::Prefer;
let mut host = None;
let mut socket_timeout = None;
let mut connect_timeout = Duration::from_secs(5);
let mut connect_timeout = None;

for (k, v) in url.query_pairs() {
match k.as_ref() {
Expand Down Expand Up @@ -280,7 +276,7 @@ impl PostgresUrl {
let as_int: usize = v
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;
connection_limit = as_int;
connection_limit = Some(as_int);
}
"host" => {
host = Some(v.to_string());
Expand All @@ -295,7 +291,7 @@ impl PostgresUrl {
let as_int = v
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;
connect_timeout = Duration::from_secs(as_int);
connect_timeout = Some(Duration::from_secs(as_int));
}
_ => {
#[cfg(not(feature = "tracing-log"))]
Expand Down Expand Up @@ -327,7 +323,7 @@ impl PostgresUrl {
}

#[cfg(feature = "pooled")]
pub(crate) fn connection_limit(&self) -> usize {
pub(crate) fn connection_limit(&self) -> Option<usize> {
self.query_params.connection_limit
}

Expand All @@ -339,7 +335,10 @@ impl PostgresUrl {
config.host(self.host());
config.port(self.port());
config.dbname(self.dbname());
config.connect_timeout(self.query_params.connect_timeout);

if let Some(connect_timeout) = self.query_params.connect_timeout {
config.connect_timeout(connect_timeout);
};

config.ssl_mode(self.query_params.ssl_mode);

Expand All @@ -350,12 +349,12 @@ impl PostgresUrl {
#[derive(Debug, Clone)]
pub(crate) struct PostgresUrlQueryParams {
ssl_params: SslParams,
connection_limit: usize,
connection_limit: Option<usize>,
schema: String,
ssl_mode: SslMode,
host: Option<String>,
socket_timeout: Option<Duration>,
connect_timeout: Duration,
connect_timeout: Option<Duration>,
}

impl PostgreSql {
Expand Down
8 changes: 4 additions & 4 deletions src/connector/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Sqlite {
}

pub struct SqliteParams {
pub connection_limit: u32,
pub connection_limit: Option<usize>,
/// This is not a `PathBuf` because we need to `ATTACH` the database to the path, and this can
/// only be done with UTF-8 paths.
pub file_path: String,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl TryFrom<&str> for SqliteParams {
Err(Error::builder(ErrorKind::DatabaseUrlIsInvalid(path.to_str().unwrap().to_string())).build())
} else {
let official = vec![];
let mut connection_limit = num_cpus::get_physical() * 2 + 1;
let mut connection_limit = None;
let mut db_name = None;
let mut socket_timeout = None;

Expand All @@ -74,7 +74,7 @@ impl TryFrom<&str> for SqliteParams {
.parse()
.map_err(|_| Error::builder(ErrorKind::InvalidConnectionArguments).build())?;

connection_limit = as_int;
connection_limit = Some(as_int);
}
"db_name" => {
db_name = Some(v.to_string());
Expand All @@ -97,7 +97,7 @@ impl TryFrom<&str> for SqliteParams {
}

Ok(Self {
connection_limit: u32::try_from(connection_limit).unwrap(),
connection_limit,
file_path: path_str.to_owned(),
db_name: db_name.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_owned()),
socket_timeout,
Expand Down
7 changes: 2 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
//! # quaint
//!
//! A database client abstraction for reading and writing to a SQL database in a
//! A database client abstraction for reading and writing to an SQL database in a
//! safe manner.
//!
//! Under construction and will go through several rounds of changes. Not meant
//! for production use in the current form.
//!
//! ### Goals
//!
//! - Query generation when the database and conditions are not known at compile
Expand Down Expand Up @@ -55,7 +52,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), quaint::error::Error> {
//! let pool = Quaint::new("file:///tmp/example.db").await?;
//! let pool = Quaint::builder("file:///tmp/example.db")?.build();
//! let conn = pool.check_out().await?;
//! let result = conn.select(Select::default().value(1)).await?;
//!
Expand Down
Loading

0 comments on commit 37bcb11

Please sign in to comment.