Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return a iterator from Connection::load #2799

Merged
merged 32 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c69d22a
Return a iterator from `Connection::load`
weiznich Jun 1, 2021
ef2d5e7
WIP
weiznich Jun 2, 2021
1cd7bb3
Port the mysql backend to use iterators
weiznich Jun 11, 2021
91124aa
Enable CI for mysql again
weiznich Jun 11, 2021
6071d41
Adjust sqlite connection implementation to return an iterator
weiznich Jun 17, 2021
3ef47c5
Enable the CI and fix some generic code
weiznich Jun 17, 2021
2dfd5f4
Add missing file
weiznich Jun 17, 2021
dd0d13a
Inline field count for sqlite rows, as this seems to be the performance
weiznich Jun 18, 2021
6b4000a
Fix todo
weiznich Jun 18, 2021
569a9d7
Apply the iterator change for the r2d2 connection
weiznich Jun 18, 2021
b3b4e45
Remove experimentall code
weiznich Jun 18, 2021
e581285
Fix sqlite deserialization
weiznich Jun 24, 2021
da4447b
Fix the custom sql function implementation for Sqlite
weiznich Jun 28, 2021
df787a2
WIP
weiznich Jun 29, 2021
6e99fe0
Multiple small fixes
weiznich Jun 30, 2021
a2d9af0
Ci fixes
weiznich Jul 1, 2021
0b3eb55
Cleanup + use bindings from libsqlite3-sys
weiznich Jul 2, 2021
77cd7af
Simplify the handling of not cached prepared statements for sqlite and
weiznich Jul 5, 2021
b644647
Minor cleanups
weiznich Jul 5, 2021
2bfe6df
Skip an unnessesary clone in postgres hot path
weiznich Jul 5, 2021
1a1b071
Address Review comments
weiznich Jul 8, 2021
0c67c4e
Only receive the column type oid if requested
weiznich Jul 8, 2021
65410ce
Address review comments
weiznich Aug 26, 2021
83eee55
Post rebase format fixes
weiznich Aug 26, 2021
ac7609d
Fix r2d2
weiznich Aug 26, 2021
d7ae4bc
Rustdoc + clippy fixes
weiznich Aug 26, 2021
594b84f
Apply review commend regarding to `Row::get_value`
weiznich Aug 26, 2021
065816a
Fix trybuild errors
weiznich Aug 26, 2021
e1e7c0a
More ci fixes
weiznich Aug 26, 2021
2dedaa4
Address comment about `Row::get_value`
weiznich Aug 27, 2021
fa913fb
Apply suggestions from code review
weiznich Sep 15, 2021
69876a1
Return a boxed iterator from `RunQueryDsl::load_iter`
weiznich Sep 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions diesel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ edition = "2018"
byteorder = "1.0"
chrono = { version = "0.4.19", optional = true, default-features = false, features = ["clock", "std"] }
libc = { version = "0.2.0", optional = true }
libsqlite3-sys = { version = ">=0.8.0, <0.23.0", optional = true, features = ["min_sqlite_version_3_7_16"] }
libsqlite3-sys = { version = ">=0.8.0, <0.23.0", optional = true, features = ["bundled_bindings"] }
mysqlclient-sys = { version = "0.2.0", optional = true }
pq-sys = { version = "0.4.0", optional = true }
quickcheck = { version = "0.9.0", optional = true }
Expand Down Expand Up @@ -44,7 +44,7 @@ ipnetwork = ">=0.12.2, <0.19.0"
quickcheck = "0.9"

[features]
default = ["with-deprecated", "32-column-tables"]
default = ["32-column-tables", "with-deprecated"]
extras = ["chrono", "serde_json", "uuid", "network-address", "numeric", "r2d2"]
unstable = ["diesel_derives/nightly"]
large-tables = ["32-column-tables"]
Expand Down
30 changes: 23 additions & 7 deletions diesel/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ mod transaction_manager;
use std::fmt::Debug;

use crate::backend::Backend;
use crate::deserialize::FromSqlRow;
use crate::expression::QueryMetadata;
use crate::query_builder::{AsQuery, QueryFragment, QueryId};
use crate::query_dsl::load_dsl::CompatibleType;
use crate::result::*;

#[doc(hidden)]
pub use self::statement_cache::{MaybeCached, StatementCache, StatementCacheKey};
pub use self::statement_cache::{MaybeCached, PrepareForCache, StatementCache, StatementCacheKey};
pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};

/// Perform simple operations on a backend.
Expand All @@ -27,8 +25,25 @@ pub trait SimpleConnection {
fn batch_execute(&mut self, query: &str) -> QueryResult<()>;
}

/// This trait describes which cursor type is used by a given connection
/// implementation. This trait is only useful in combination with [`Connection`].
///
/// Implementation wise this is a workaround for GAT's
pub trait ConnectionGatWorkaround<'a, DB: Backend> {
/// The cursor type returned by [`Connection::load`]
///
/// Users should handle this as opaque type that implements [`Iterator`]
type Cursor: Iterator<Item = QueryResult<Self::Row>>;
/// The row type used as [`Iterator::Item`] for the iterator implementation
/// of [`ConnectionGatWorkaround::Cursor`]
type Row: crate::row::Row<'a, DB>;
}

/// A connection to a database
pub trait Connection: SimpleConnection + Sized + Send {
pub trait Connection: SimpleConnection + Sized + Send
where
Self: for<'a> ConnectionGatWorkaround<'a, <Self as Connection>::Backend>,
{
/// The backend this type connects to
type Backend: Backend;

Expand Down Expand Up @@ -177,12 +192,13 @@ pub trait Connection: SimpleConnection + Sized + Send {
fn execute(&mut self, query: &str) -> QueryResult<usize>;

#[doc(hidden)]
fn load<T, U, ST>(&mut self, source: T) -> QueryResult<Vec<U>>
fn load<T>(
&mut self,
source: T,
) -> QueryResult<<Self as ConnectionGatWorkaround<Self::Backend>>::Cursor>
where
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
T::SqlType: CompatibleType<U, Self::Backend, SqlType = ST>,
U: FromSqlRow<ST, Self::Backend>,
Self::Backend: QueryMetadata<T::SqlType>;

#[doc(hidden)]
Expand Down
12 changes: 9 additions & 3 deletions diesel/src/connection/statement_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ pub struct StatementCache<DB: Backend, Statement> {
pub cache: HashMap<StatementCacheKey<DB>, Statement>,
}

#[derive(Debug, Clone, Copy)]
pub enum PrepareForCache {
Yes,
No,
}

#[allow(clippy::len_without_is_empty, clippy::new_without_default)]
impl<DB, Statement> StatementCache<DB, Statement>
where
Expand All @@ -133,23 +139,23 @@ where
) -> QueryResult<MaybeCached<Statement>>
where
T: QueryFragment<DB> + QueryId,
F: FnOnce(&str) -> QueryResult<Statement>,
F: FnOnce(&str, PrepareForCache) -> QueryResult<Statement>,
{
use std::collections::hash_map::Entry::{Occupied, Vacant};

let cache_key = StatementCacheKey::for_source(source, bind_types)?;

if !source.is_safe_to_cache_prepared()? {
let sql = cache_key.sql(source)?;
return prepare_fn(&sql).map(MaybeCached::CannotCache);
return prepare_fn(&sql, PrepareForCache::No).map(MaybeCached::CannotCache);
}

let cached_result = match self.cache.entry(cache_key) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let statement = {
let sql = entry.key().sql(source)?;
prepare_fn(&sql)
prepare_fn(&sql, PrepareForCache::Yes)
};

entry.insert(statement?)
Expand Down
110 changes: 63 additions & 47 deletions diesel/src/mysql/connection/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,23 @@ use crate::mysql::types::MYSQL_TIME;
use crate::mysql::{MysqlType, MysqlValue};
use crate::result::QueryResult;

pub struct Binds {
pub struct PreparedStatementBinds(Binds);

pub struct OutputBinds(Binds);

impl Clone for OutputBinds {
fn clone(&self) -> Self {
Self(Binds {
data: self.0.data.clone(),
})
}
}

struct Binds {
data: Vec<BindData>,
}

impl Binds {
impl PreparedStatementBinds {
pub fn from_input_data<Iter>(input: Iter) -> QueryResult<Self>
where
Iter: IntoIterator<Item = (MysqlType, Option<Vec<u8>>)>,
Expand All @@ -25,34 +37,31 @@ impl Binds {
.map(BindData::for_input)
.collect::<Vec<_>>();

Ok(Binds { data })
Ok(Self(Binds { data }))
}

pub fn from_output_types(types: Vec<Option<MysqlType>>, metadata: &StatementMetadata) -> Self {
pub fn with_mysql_binds<F, T>(&mut self, f: F) -> T
where
F: FnOnce(*mut ffi::MYSQL_BIND) -> T,
{
self.0.with_mysql_binds(f)
}
}

impl OutputBinds {
pub fn from_output_types(types: &[Option<MysqlType>], metadata: &StatementMetadata) -> Self {
let data = metadata
.fields()
.iter()
.zip(types.into_iter().chain(std::iter::repeat(None)))
.zip(types.iter().copied().chain(std::iter::repeat(None)))
.map(|(field, tpe)| BindData::for_output(tpe, field))
.collect();

Binds { data }
}

pub fn with_mysql_binds<F, T>(&mut self, f: F) -> T
where
F: FnOnce(*mut ffi::MYSQL_BIND) -> T,
{
let mut binds = self
.data
.iter_mut()
.map(|x| unsafe { x.mysql_bind() })
.collect::<Vec<_>>();
f(binds.as_mut_ptr())
Self(Binds { data })
}

pub fn populate_dynamic_buffers(&mut self, stmt: &Statement) -> QueryResult<()> {
for (i, data) in self.data.iter_mut().enumerate() {
for (i, data) in self.0.data.iter_mut().enumerate() {
data.did_numeric_overflow_occur()?;
// This is safe because we are re-binding the invalidated buffers
// at the end of this function
Expand All @@ -69,20 +78,37 @@ impl Binds {
}

pub fn update_buffer_lengths(&mut self) {
for data in &mut self.data {
for data in &mut self.0.data {
data.update_buffer_length();
}
}

pub fn len(&self) -> usize {
self.data.len()
pub fn with_mysql_binds<F, T>(&mut self, f: F) -> T
where
F: FnOnce(*mut ffi::MYSQL_BIND) -> T,
{
self.0.with_mysql_binds(f)
}
}

impl Binds {
fn with_mysql_binds<F, T>(&mut self, f: F) -> T
where
F: FnOnce(*mut ffi::MYSQL_BIND) -> T,
{
let mut binds = self
.data
.iter_mut()
.map(|x| unsafe { x.mysql_bind() })
.collect::<Vec<_>>();
f(binds.as_mut_ptr())
}
}

impl Index<usize> for Binds {
impl Index<usize> for OutputBinds {
type Output = BindData;
fn index(&self, index: usize) -> &Self::Output {
&self.data[index]
&self.0.data[index]
}
}

Expand Down Expand Up @@ -122,7 +148,7 @@ impl From<u32> for Flags {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BindData {
tpe: ffi::enum_field_types,
bytes: Vec<u8>,
Expand Down Expand Up @@ -713,9 +739,8 @@ mod tests {
)
.unwrap();

let mut stmt = conn
.prepare_query(&crate::sql_query(
"SELECT
let mut stmt = conn.prepared_query(&crate::sql_query(
"SELECT
tiny_int, small_int, medium_int, int_col,
big_int, unsigned_int, zero_fill_int,
numeric_col, decimal_col, float_col, double_col, bit_col,
Expand All @@ -725,30 +750,21 @@ mod tests {
ST_AsText(polygon_col), ST_AsText(multipoint_col), ST_AsText(multilinestring_col),
ST_AsText(multipolygon_col), ST_AsText(geometry_collection), json_col
FROM all_mysql_types",
))
.unwrap();
)).unwrap();

let metadata = stmt.metadata().unwrap();
let mut output_binds =
Binds::from_output_types(vec![None; metadata.fields().len()], &metadata);
OutputBinds::from_output_types(&vec![None; metadata.fields().len()], &metadata);
stmt.execute_statement(&mut output_binds).unwrap();
stmt.populate_row_buffers(&mut output_binds).unwrap();

let results: Vec<(BindData, &_)> = output_binds
.0
.data
.into_iter()
.zip(metadata.fields())
.collect::<Vec<_>>();

macro_rules! matches {
($expression:expr, $( $pattern:pat )|+ $( if $guard: expr )?) => {
match $expression {
$( $pattern )|+ $( if $guard )? => true,
_ => false
}
}
}

let tiny_int_col = &results[0].0;
assert_eq!(tiny_int_col.tpe, ffi::enum_field_types::MYSQL_TYPE_TINY);
assert!(tiny_int_col.flags.contains(Flags::NUM_FLAG));
Expand Down Expand Up @@ -1057,9 +1073,9 @@ mod tests {
assert!(!polygon_col.flags.contains(Flags::ENUM_FLAG));
assert!(!polygon_col.flags.contains(Flags::BINARY_FLAG));
assert_eq!(
to_value::<Text, String>(polygon_col).unwrap(),
"MULTIPOLYGON(((28 26,28 0,84 0,84 42,28 26),(52 18,66 23,73 9,48 6,52 18)),((59 18,67 18,67 13,59 13,59 18)))"
);
to_value::<Text, String>(polygon_col).unwrap(),
"MULTIPOLYGON(((28 26,28 0,84 0,84 42,28 26),(52 18,66 23,73 9,48 6,52 18)),((59 18,67 18,67 13,59 13,59 18)))"
);

let geometry_collection = &results[32].0;
assert_eq!(
Expand Down Expand Up @@ -1105,12 +1121,12 @@ mod tests {

let bind = BindData::for_test_output(bind_tpe.into());

let mut binds = Binds { data: vec![bind] };
let mut binds = OutputBinds(Binds { data: vec![bind] });

stmt.execute_statement(&mut binds).unwrap();
stmt.populate_row_buffers(&mut binds).unwrap();

binds.data.remove(0)
binds.0.data.remove(0)
}

fn input_bind(
Expand Down Expand Up @@ -1144,9 +1160,9 @@ mod tests {
is_truncated: None,
};

let binds = Binds {
let binds = PreparedStatementBinds(Binds {
data: vec![id_bind, field_bind],
};
});
stmt.input_bind(binds).unwrap();
stmt.did_an_error_occur().unwrap();
unsafe {
Expand Down
Loading