From 5126a1f00c1c2eccf59763cdc80ea427fe5d732b Mon Sep 17 00:00:00 2001 From: Feike Steenbergen Date: Fri, 16 Jun 2023 08:49:25 +0200 Subject: [PATCH] Implement PGRXSharedMemory for Deque Having a double ended queue allows for more efficient FIFO workloads when using shared memory Sample usage: shmem=# select * from deque_count(); deque_count ------------- 0 shmem=# select deque_push_front('{"value1": 2, "value2": 2}'); deque_push_front ------------------ shmem=# select deque_push_front('{"value1": 1, "value2": 1}'); deque_push_front ------------------ shmem=# select deque_push_back('{"value1": 3, "value2": 3}'); deque_push_back ----------------- shmem=# select deque_push_back('{"value1": 4, "value2": 4}'); deque_push_back ----------------- shmem=# select * from deque_select(); deque_select ------------------------- {"value1":1,"value2":1} {"value1":2,"value2":2} {"value1":3,"value2":3} {"value1":4,"value2":4} (4 rows) shmem=# select deque_pop_front(); deque_pop_front ------------------------- {"value1":1,"value2":1} shmem=# select deque_pop_back(); deque_pop_back ------------------------- {"value1":4,"value2":4} --- pgrx-examples/shmem/src/lib.rs | 46 ++++++++++++++++++++++++++++++++++ pgrx/src/shmem.rs | 1 + 2 files changed, 47 insertions(+) diff --git a/pgrx-examples/shmem/src/lib.rs b/pgrx-examples/shmem/src/lib.rs index 00c4fe7607..bbf14ace21 100644 --- a/pgrx-examples/shmem/src/lib.rs +++ b/pgrx-examples/shmem/src/lib.rs @@ -32,6 +32,7 @@ impl Default for Pgtest { } unsafe impl PGRXSharedMemory for Pgtest {} +static DEQUE: PgLwLock> = PgLwLock::new(); static VEC: PgLwLock> = PgLwLock::new(); static HASH: PgLwLock> = PgLwLock::new(); static STRUCT: PgLwLock = PgLwLock::new(); @@ -40,6 +41,7 @@ static ATOMIC: PgAtomic = PgAtomic::new(); #[pg_guard] pub extern "C" fn _PG_init() { + pg_shmem_init!(DEQUE); pg_shmem_init!(VEC); pg_shmem_init!(HASH); pg_shmem_init!(STRUCT); @@ -75,6 +77,50 @@ fn vec_pop() -> Option { VEC.exclusive().pop() } +#[pg_extern] +fn deque_select() -> SetOfIterator<'static, Pgtest> { + SetOfIterator::new(DEQUE.share().iter().map(|i| *i).collect::>().into_iter()) +} + +#[pg_extern] +fn deque_count() -> i32 { + DEQUE.share().len() as i32 +} + +#[pg_extern] +fn deque_drain() -> SetOfIterator<'static, Pgtest> { + let mut vec = DEQUE.exclusive(); + let r = vec.iter().map(|i| *i).collect::>(); + vec.clear(); + SetOfIterator::new(r.into_iter()) +} + +#[pg_extern] +fn deque_push_back(value: Pgtest) { + DEQUE + .exclusive() + .push_back(value) + .unwrap_or_else(|_| warning!("Deque is full, discarding update")); +} + +#[pg_extern] +fn deque_push_front(value: Pgtest) { + DEQUE + .exclusive() + .push_front(value) + .unwrap_or_else(|_| warning!("Deque is full, discarding update")); +} + +#[pg_extern] +fn deque_pop_back() -> Option { + DEQUE.exclusive().pop_back() +} + +#[pg_extern] +fn deque_pop_front() -> Option { + DEQUE.exclusive().pop_front() +} + #[pg_extern] fn hash_insert(key: i32, value: i32) { HASH.exclusive().insert(key, value).unwrap(); diff --git a/pgrx/src/shmem.rs b/pgrx/src/shmem.rs index 0a898d0e03..c3e612408a 100644 --- a/pgrx/src/shmem.rs +++ b/pgrx/src/shmem.rs @@ -259,6 +259,7 @@ where { } unsafe impl PGRXSharedMemory for heapless::Vec {} +unsafe impl PGRXSharedMemory for heapless::Deque {} unsafe impl PGRXSharedMemory for heapless::IndexMap {