Skip to content

Commit

Permalink
Simplified tx id generator.
Browse files Browse the repository at this point in the history
  • Loading branch information
smyrgeorge committed Jun 23, 2024
1 parent 4af46ae commit 3394308
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions rust_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use sqlx::{Row, TypeInfo, ValueRef};
use std::collections::HashMap;
use std::ffi::{c_long, c_void};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::RwLock;
use std::{
ffi::{c_char, c_int, CStr, CString},
Expand All @@ -22,9 +21,9 @@ static SQLX4K: OnceLock<Sqlx4k> = OnceLock::new();
pub struct Sqlx4k<'a> {
runtime: Runtime,
pool: PgPool,
tx: RwLock<HashMap<i64, sqlx::Transaction<'a, sqlx::Postgres>>>,
tx_id_generator: AtomicI64,
tx: RwLock<(i64, HashMap<i64, sqlx::Transaction<'a, sqlx::Postgres>>)>,
}
// tx: Vec<sqlx::Transaction<'a, sqlx::Postgres>>,

#[repr(C)]
pub struct Sqlx4kResult {
Expand Down Expand Up @@ -109,15 +108,11 @@ pub extern "C" fn sqlx4k_of(

// Create the pool here.
let pool: PgPool = runtime.block_on(pool).unwrap();
// Create the transaction keeper here.
let tx = RwLock::new(HashMap::with_capacity(max_connections as usize));
let tx_id_generator = AtomicI64::new(0);
let sqlx4k = Sqlx4k {
runtime,
pool,
tx,
tx_id_generator,
};
// Create the transaction holder here.
let tx: RwLock<(i64, HashMap<i64, sqlx::Transaction<sqlx::Postgres>>)> =
RwLock::new((0, HashMap::with_capacity(max_connections as usize)));
// let tx: Vec<sqlx::Transaction<sqlx::Postgres>> = Vec::with_capacity(max_connections as usize);
let sqlx4k = Sqlx4k { runtime, pool, tx };
SQLX4K.set(sqlx4k).unwrap();
ok()
}
Expand Down Expand Up @@ -149,15 +144,15 @@ pub extern "C" fn sqlx4k_tx_begin() -> c_long {
let tx = sqlx4k.runtime.block_on(sqlx4k.pool.begin()).unwrap();
let id = {
let mut lock = sqlx4k.tx.write().unwrap();
let id = sqlx4k.tx_id_generator.fetch_add(1, Ordering::SeqCst);

// Reset the tx id generator to zero to prevent overflow.
let id = lock.0 + 1;
if id > 999_999_999_999 {
sqlx4k.tx_id_generator.store(0, Ordering::SeqCst)
// Reset the tx id generator to zero to prevent overflow.
lock.0 = 0;
}

// If tx id exists, the driver will panic.
if let Some(id) = lock.insert(id, tx) {
if let Some(id) = lock.1.insert(id, tx) {
panic!("Encountered dublicate tx id={:?}.", id);
}
id
Expand All @@ -170,7 +165,7 @@ pub extern "C" fn sqlx4k_tx_commit(tx: c_long) {
let sqlx4k = SQLX4K.get().unwrap();
{
let mut lock = sqlx4k.tx.write().unwrap();
let tx = lock.remove(&tx).unwrap();
let tx = lock.1.remove(&tx).unwrap();
sqlx4k.runtime.block_on(tx.commit()).unwrap();
}
}
Expand All @@ -180,7 +175,7 @@ pub extern "C" fn sqlx4k_tx_rollback(tx: c_long) {
let sqlx4k = SQLX4K.get().unwrap();
{
let mut lock = sqlx4k.tx.write().unwrap();
let tx = lock.remove(&tx).unwrap();
let tx = lock.1.remove(&tx).unwrap();
sqlx4k.runtime.block_on(tx.rollback()).unwrap();
}
}
Expand All @@ -191,7 +186,7 @@ pub extern "C" fn sqlx4k_tx_query(tx: c_long, sql: *const c_char) -> *mut Sqlx4k
let sql = c_chars_to_str(sql).unwrap();
{
let mut lock = sqlx4k.tx.write().unwrap();
let tx = lock.get_mut(&tx).unwrap();
let tx = lock.1.get_mut(&tx).unwrap();
let query = tx.fetch_optional(sql);
let _result = sqlx4k.runtime.block_on(query).unwrap();
};
Expand All @@ -204,7 +199,7 @@ pub extern "C" fn sqlx4k_tx_fetch_all(tx: c_long, sql: *const c_char) -> *mut Sq
let sql = c_chars_to_str(sql).unwrap();
let result = {
let mut lock = sqlx4k.tx.write().unwrap();
let tx = lock.get_mut(&tx).unwrap();
let tx = lock.1.get_mut(&tx).unwrap();
let query = tx.fetch_all(sql);
let result: Result<Vec<PgRow>, sqlx::Error> = sqlx4k.runtime.block_on(query);
sqlx4k_result_of(result)
Expand Down

0 comments on commit 3394308

Please sign in to comment.