Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
weiznich committed Jun 2, 2021
1 parent 12fd769 commit b80abaf
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 63 deletions.
2 changes: 1 addition & 1 deletion diesel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ ipnetwork = ">=0.12.2, <0.19.0"
quickcheck = "0.9"

[features]
default = ["postgres"]
default = ["postgres", "mysql"]
extras = ["chrono", "serde_json", "uuid", "network-address", "numeric", "r2d2"]
unstable = ["diesel_derives/nightly"]
large-tables = ["32-column-tables"]
Expand Down
14 changes: 14 additions & 0 deletions diesel/src/expression/array_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ where
}
}

// impl<T, ST, const N: usize> AsInExpression<ST> for [T; N]
// where T: AsExpression<ST>,
// ST: SqlType + TypedExpressionType
// {
// type InExpression = StaticMany<T::Expressions, N>;

// fn as_in_expression(self) -> Self::InExpression {
// todo!()
// }
// }

pub trait MaybeEmpty {
fn is_empty(&self) -> bool;
}
Expand Down Expand Up @@ -145,6 +156,9 @@ where
}
}

// #[derive(Debug, Clone, ValidGrouping)]
// pub struct StaticMany<T, const N:usize>([T; N]);

#[derive(Debug, Clone, ValidGrouping)]
pub struct Many<T>(Vec<T>);

Expand Down
13 changes: 9 additions & 4 deletions diesel/src/mysql/connection/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ impl Binds {
Ok(Binds { data })
}

pub fn from_output_types(types: Vec<Option<MysqlType>>, metadata: &StatementMetadata) -> Self {
pub fn from_output_types(types: &[Option<MysqlType>], metadata: &StatementMetadata) -> Self {
let data = metadata
.fields()
.iter()
.zip(types.into_iter().chain(std::iter::repeat(None)))
.zip(
types
.into_iter()
.map(|o| o.as_ref())
.chain(std::iter::repeat(None)),
)
.map(|(field, tpe)| BindData::for_output(tpe, field))
.collect();

Expand Down Expand Up @@ -147,7 +152,7 @@ impl BindData {
}
}

fn for_output(tpe: Option<MysqlType>, metadata: &MysqlFieldMetadata) -> Self {
fn for_output(tpe: Option<&MysqlType>, metadata: &MysqlFieldMetadata) -> Self {
let (tpe, flags) = if let Some(tpe) = tpe {
match (tpe, metadata.field_type()) {
// Those are types where we handle the conversion in diesel itself
Expand Down Expand Up @@ -274,7 +279,7 @@ impl BindData {
(metadata.field_type(), metadata.flags())
}

(tpe, _) => tpe.into(),
(tpe, _) => (*tpe).into(),
}
} else {
(metadata.field_type(), metadata.flags())
Expand Down
30 changes: 10 additions & 20 deletions diesel/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ use self::stmt::Statement;
use self::url::ConnectionOptions;
use super::backend::Mysql;
use crate::connection::*;
use crate::deserialize::FromSqlRow;
use crate::expression::QueryMetadata;
use crate::query_builder::bind_collector::RawBytesBindCollector;
use crate::query_builder::*;
use crate::query_dsl::load_dsl::CompatibleType;
use crate::result::*;
use crate::row::Row;

#[allow(missing_debug_implementations, missing_copy_implementations)]
/// A connection to a MySQL database. Connection URLs should be in the form
Expand All @@ -34,12 +31,12 @@ impl SimpleConnection for MysqlConnection {
}
}

impl<'a> IterableConnection<'a> for MysqlConnection {
impl<'a> IterableConnection<'a, Mysql> for MysqlConnection {
type Cursor = self::stmt::iterator::StatementIterator<'a>;
type Row = self::stmt::iterator::MysqlRow<'a>;
}

/*impl Connection for MysqlConnection {
impl Connection for MysqlConnection {
type Backend = Mysql;
type TransactionManager = AnsiTransactionManager;

Expand Down Expand Up @@ -67,27 +64,20 @@ impl<'a> IterableConnection<'a> for MysqlConnection {
}

#[doc(hidden)]
fn load<'a, T, ST>(
fn load<'a, T>(
&'a mut self,
source: T,
) -> QueryResult<<Self as IterableConnection<'a>>::Cursor>
) -> QueryResult<<Self as IterableConnection<'a, Self::Backend>>::Cursor>
where
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
Self::Backend: QueryMetadata<T::SqlType>,
Self: IterableConnection<'a>,
<Self as IterableConnection<'a>>::Cursor:
Iterator<Item = QueryResult<&'a <Self as IterableConnection<'a>>::Row>>,
for<'b> <Self as IterableConnection<'a>>::Row: Row<'b, Self::Backend>,
{
todo!()
// use crate::result::Error::DeserializationError;
// let mut stmt = self.prepare_query(&source.as_query())?;
// let mut metadata = Vec::new();
// Mysql::row_metadata(&mut (), &mut metadata);
// let results = unsafe { stmt.results(metadata)? };
// results.map(|row| U::build_from_row(&row).map_err(DeserializationError))
let mut stmt = self.prepare_query(&source.as_query())?;
let mut metadata = Vec::new();
Mysql::row_metadata(&mut (), &mut metadata);
let results = unsafe { stmt.results(metadata)? };
Ok(results)
}

#[doc(hidden)]
Expand Down Expand Up @@ -135,7 +125,7 @@ impl MysqlConnection {
self.execute("SET character_set_results = 'utf8mb4'")?;
Ok(())
}
}*/
}

#[cfg(test)]
mod tests {
Expand Down
120 changes: 84 additions & 36 deletions diesel/src/mysql/connection/stmt/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,104 @@
use super::{metadata::MysqlFieldMetadata, BindData, Binds, Statement, StatementMetadata};
use std::marker::PhantomData;
use std::rc::Rc;

use super::{Binds, Statement, StatementMetadata};
use super::metadata::MysqlFieldMetadata;
use crate::mysql::{Mysql, MysqlType};
use crate::result::QueryResult;
use crate::row::*;

pub struct StatementIterator<'a> {
stmt: &'a mut Statement,
output_binds: Binds,
metadata: StatementMetadata,
output_binds: Rc<Binds>,
metadata: Rc<StatementMetadata>,
types: Vec<Option<MysqlType>>,
size: usize,
fetched_rows: usize,
}

#[allow(clippy::should_implement_trait)] // don't neet `Iterator` here
impl<'a> StatementIterator<'a> {
#[allow(clippy::new_ret_no_self)]
pub fn new(stmt: &'a mut Statement, types: Vec<Option<MysqlType>>) -> QueryResult<Self> {
let metadata = stmt.metadata()?;

let mut output_binds = Binds::from_output_types(types, &metadata);
let mut output_binds = Binds::from_output_types(&types, &metadata);

stmt.execute_statement(&mut output_binds)?;
let size = unsafe { stmt.result_size() }?;

Ok(StatementIterator {
metadata: Rc::new(metadata),
output_binds: Rc::new(output_binds),
fetched_rows: 0,
size,
stmt,
output_binds,
metadata,
types,
})
}
}

impl<'a> Iterator for StatementIterator<'a> {
type Item = QueryResult<MysqlRow<'a>>;

fn next(&mut self) -> Option<Self::Item> {
// check if we own the only instance of the bind buffer
// if that's the case we can reuse the underlying allocations
// if that's not the case, allocate a new buffer
let res = if let Some(binds) = Rc::get_mut(&mut self.output_binds) {
self.stmt
.populate_row_buffers(binds)
.map(|o| o.map(|()| self.output_binds.clone()))
} else {
// The shared bind buffer is in use by someone else,
// we allocate a new buffer here
let mut output_binds = Binds::from_output_types(&self.types, &self.metadata);
self.stmt
.populate_row_buffers(&mut output_binds)
.map(|o| o.map(|()| Rc::new(output_binds)))
};

match res {
Ok(Some(binds)) => {
self.fetched_rows += 1;
Some(Ok(MysqlRow {
col_idx: 0,
binds,
metadata: self.metadata.clone(),
_marker: Default::default(),
}))
}
Ok(None) => None,
Err(e) => {
self.fetched_rows += 1;
Some(Err(e))
}
}
}

pub fn map<F, T>(mut self, mut f: F) -> QueryResult<Vec<T>>
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len(), Some(self.len()))
}

fn count(self) -> usize
where
F: FnMut(MysqlRow) -> QueryResult<T>,
Self: Sized,
{
let mut results = Vec::new();
while let Some(row) = self.next() {
results.push(f(row?)?);
}
Ok(results)
self.len()
}
}

fn next(&mut self) -> Option<QueryResult<MysqlRow>> {
match self.stmt.populate_row_buffers(&mut self.output_binds) {
Ok(Some(())) => Some(Ok(MysqlRow {
col_idx: 0,
binds: &mut self.output_binds,
metadata: &self.metadata,
})),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
impl<'a> ExactSizeIterator for StatementIterator<'a> {
fn len(&self) -> usize {
self.size - self.fetched_rows
}
}

#[derive(Clone)]
pub struct MysqlRow<'a> {
col_idx: usize,
binds: &'a Binds,
metadata: &'a StatementMetadata,
binds: Rc<Binds>,
metadata: Rc<StatementMetadata>,
_marker: PhantomData<&'a mut (Binds, StatementMetadata)>,
}

impl<'a> Row<'a, Mysql> for MysqlRow<'a> {
Expand All @@ -71,8 +115,10 @@ impl<'a> Row<'a, Mysql> for MysqlRow<'a> {
{
let idx = self.idx(idx)?;
Some(MysqlField {
bind: &self.binds[idx],
metadata: &self.metadata.fields()[idx],
bind: self.binds.clone(),
metadata: self.metadata.clone(),
idx,
_marker: Default::default(),
})
}

Expand Down Expand Up @@ -103,20 +149,22 @@ impl<'a, 'b> RowIndex<&'a str> for MysqlRow<'b> {
}

pub struct MysqlField<'a> {
bind: &'a BindData,
metadata: &'a MysqlFieldMetadata<'a>,
bind: Rc<Binds>,
metadata: Rc<StatementMetadata>,
idx: usize,
_marker: PhantomData<&'a (Binds, StatementMetadata)>
}

impl<'a> Field<'a, Mysql> for MysqlField<'a> {
fn field_name(&self) -> Option<&'a str> {
self.metadata.field_name()
impl<'a> Field<Mysql> for MysqlField<'a> {
fn field_name(&self) -> Option<&str> {
self.metadata.fields()[self.idx].field_name()
}

fn is_null(&self) -> bool {
self.bind.is_null()
(*self.bind)[self.idx].is_null()
}

fn value(&self) -> Option<crate::backend::RawValue<'a, Mysql>> {
self.bind.value()
fn value<'b>(&'b self) -> Option<crate::backend::RawValue<'b, Mysql>> {
self.bind[self.idx].value()
}
}
12 changes: 10 additions & 2 deletions diesel/src/mysql/connection/stmt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ extern crate mysqlclient_sys as ffi;
pub mod iterator;
mod metadata;

use std::convert::TryFrom;
use std::ffi::CStr;
use std::os::raw as libc;
use std::ptr::NonNull;

use self::iterator::*;
use super::bind::{BindData, Binds};
use crate::mysql::MysqlType;
use crate::result::{DatabaseErrorKind, QueryResult};
use crate::result::{DatabaseErrorKind, Error, QueryResult};

pub use self::metadata::{MysqlFieldMetadata, StatementMetadata};

Expand Down Expand Up @@ -80,12 +81,19 @@ impl Statement {
/// have a return value. After calling this function, `execute` can never
/// be called on this statement.
pub unsafe fn results(
&mut self,
self,
types: Vec<Option<MysqlType>>,
) -> QueryResult<StatementIterator> {
StatementIterator::new(self, types)
}

/// This function should be called after `execute` only
/// otherwise it's not guranteed to return a valid result
pub(in crate::mysql::connection) unsafe fn result_size(&mut self) -> QueryResult<usize> {
let size = ffi::mysql_stmt_num_rows(self.stmt.as_ptr());
usize::try_from(size).map_err(|e| Error::DeserializationError(Box::new(e)))
}

fn last_error_message(&self) -> String {
unsafe { CStr::from_ptr(ffi::mysql_stmt_error(self.stmt.as_ptr())) }
.to_string_lossy()
Expand Down

0 comments on commit b80abaf

Please sign in to comment.