Skip to content

Commit

Permalink
Implement PGRXSharedMemory for Deque
Browse files Browse the repository at this point in the history
Having a double ended queue allows for more efficient FIFO workloads
when using shared memory

Sample usage:

    shmem=# select * from deque_count();
     deque_count
    -------------
               0

    shmem=# select deque_push_front('{"value1": 2, "value2": 2}');
     deque_push_front
    ------------------

    shmem=# select deque_push_front('{"value1": 1, "value2": 1}');
     deque_push_front
    ------------------

    shmem=# select deque_push_back('{"value1": 3, "value2": 3}');
     deque_push_back
    -----------------

    shmem=# select deque_push_back('{"value1": 4, "value2": 4}');
     deque_push_back
    -----------------

    shmem=# select * from deque_select();
          deque_select
    -------------------------
     {"value1":1,"value2":1}
     {"value1":2,"value2":2}
     {"value1":3,"value2":3}
     {"value1":4,"value2":4}
    (4 rows)

    shmem=# select deque_pop_front();
         deque_pop_front
    -------------------------
     {"value1":1,"value2":1}

    shmem=# select deque_pop_back();
         deque_pop_back
    -------------------------
     {"value1":4,"value2":4}
  • Loading branch information
feikesteenbergen committed Jun 16, 2023
1 parent ecbf925 commit 5126a1f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
46 changes: 46 additions & 0 deletions pgrx-examples/shmem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Default for Pgtest {
}
unsafe impl PGRXSharedMemory for Pgtest {}

static DEQUE: PgLwLock<heapless::Deque<Pgtest, 400>> = PgLwLock::new();
static VEC: PgLwLock<heapless::Vec<Pgtest, 400>> = PgLwLock::new();
static HASH: PgLwLock<heapless::FnvIndexMap<i32, i32, 4>> = PgLwLock::new();
static STRUCT: PgLwLock<Pgtest> = PgLwLock::new();
Expand All @@ -40,6 +41,7 @@ static ATOMIC: PgAtomic<std::sync::atomic::AtomicBool> = PgAtomic::new();

#[pg_guard]
pub extern "C" fn _PG_init() {
pg_shmem_init!(DEQUE);
pg_shmem_init!(VEC);
pg_shmem_init!(HASH);
pg_shmem_init!(STRUCT);
Expand Down Expand Up @@ -75,6 +77,50 @@ fn vec_pop() -> Option<Pgtest> {
VEC.exclusive().pop()
}

#[pg_extern]
fn deque_select() -> SetOfIterator<'static, Pgtest> {
SetOfIterator::new(DEQUE.share().iter().map(|i| *i).collect::<Vec<Pgtest>>().into_iter())
}

#[pg_extern]
fn deque_count() -> i32 {
DEQUE.share().len() as i32
}

#[pg_extern]
fn deque_drain() -> SetOfIterator<'static, Pgtest> {
let mut vec = DEQUE.exclusive();
let r = vec.iter().map(|i| *i).collect::<Vec<Pgtest>>();
vec.clear();
SetOfIterator::new(r.into_iter())
}

#[pg_extern]
fn deque_push_back(value: Pgtest) {
DEQUE
.exclusive()
.push_back(value)
.unwrap_or_else(|_| warning!("Deque is full, discarding update"));
}

#[pg_extern]
fn deque_push_front(value: Pgtest) {
DEQUE
.exclusive()
.push_front(value)
.unwrap_or_else(|_| warning!("Deque is full, discarding update"));
}

#[pg_extern]
fn deque_pop_back() -> Option<Pgtest> {
DEQUE.exclusive().pop_back()
}

#[pg_extern]
fn deque_pop_front() -> Option<Pgtest> {
DEQUE.exclusive().pop_front()
}

#[pg_extern]
fn hash_insert(key: i32, value: i32) {
HASH.exclusive().insert(key, value).unwrap();
Expand Down
1 change: 1 addition & 0 deletions pgrx/src/shmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ where
{
}
unsafe impl<T, const N: usize> PGRXSharedMemory for heapless::Vec<T, N> {}
unsafe impl<T, const N: usize> PGRXSharedMemory for heapless::Deque<T, N> {}
unsafe impl<K: Eq + Hash, V: Default, S, const N: usize> PGRXSharedMemory
for heapless::IndexMap<K, V, S, N>
{
Expand Down

0 comments on commit 5126a1f

Please sign in to comment.