Skip to content

Commit

Permalink
session: use msg queue for events
Browse files Browse the repository at this point in the history
Change-Id: I3c58367eec2243fe19b75be78a175c5261863e9e
Signed-off-by: Florin Coras <[email protected]>
  • Loading branch information
florincoras authored and dmarion committed Jul 17, 2018
1 parent 5da96a7 commit 3c2fed5
Show file tree
Hide file tree
Showing 27 changed files with 773 additions and 400 deletions.
102 changes: 84 additions & 18 deletions src/svm/message_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
#include <svm/message_queue.h>
#include <vppinfra/mem.h>

static inline svm_msg_q_ring_t *
svm_msg_q_ring_inline (svm_msg_q_t * mq, u32 ring_index)
{
return vec_elt_at_index (mq->rings, ring_index);
}

svm_msg_q_ring_t *
svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
{
return svm_msg_q_ring_inline (mq, ring_index);
}

static inline void *
svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
{
ASSERT (elt_index < ring->nitems);
return (ring->data + elt_index * ring->elsize);
}

svm_msg_q_t *
svm_msg_q_alloc (svm_msg_q_cfg_t * cfg)
{
Expand Down Expand Up @@ -62,6 +81,53 @@ svm_msg_q_free (svm_msg_q_t * mq)
clib_mem_free (mq);
}

svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
svm_msg_q_msg_t msg = {.as_u64 = ~0 };
svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);

ASSERT (ring->cursize != ring->nitems);
msg.ring_index = ring - mq->rings;
msg.elt_index = ring->tail;
ring->tail = (ring->tail + 1) % ring->nitems;
__sync_fetch_and_add (&ring->cursize, 1);
return msg;
}

int
svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
u8 noblock, svm_msg_q_msg_t * msg)
{
if (noblock)
{
if (svm_msg_q_try_lock (mq))
return -1;
if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
{
svm_msg_q_unlock (mq);
return -2;
}
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
if (PREDICT_FALSE (svm_msg_q_msg_is_invalid (msg)))
{
svm_msg_q_unlock (mq);
return -2;
}
}
else
{
svm_msg_q_lock (mq);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
while (svm_msg_q_msg_is_invalid (msg))
{
svm_msg_q_wait (mq);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
}
}
return 0;
}

svm_msg_q_msg_t
svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
{
Expand All @@ -81,23 +147,10 @@ svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes)
return msg;
}

static inline svm_msg_q_ring_t *
svm_msg_q_get_ring (svm_msg_q_t * mq, u32 ring_index)
{
return vec_elt_at_index (mq->rings, ring_index);
}

static inline void *
svm_msg_q_ring_data (svm_msg_q_ring_t * ring, u32 elt_index)
{
ASSERT (elt_index < ring->nitems);
return (ring->data + elt_index * ring->elsize);
}

void *
svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
svm_msg_q_ring_t *ring = svm_msg_q_get_ring (mq, msg->ring_index);
svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, msg->ring_index);
return svm_msg_q_ring_data (ring, msg->elt_index);
}

Expand Down Expand Up @@ -131,7 +184,7 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
return 0;
ring = &mq->rings[msg->ring_index];

dist1 = ((ring->nitems + msg->ring_index) - ring->head) % ring->nitems;
dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
if (ring->tail == ring->head)
dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
else
Expand All @@ -140,10 +193,17 @@ svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
}

int
svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait)
svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
{
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
return svm_queue_add (mq->q, (u8 *) msg, nowait);
}

void
svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
ASSERT (svm_msq_q_msg_is_valid (mq, &msg));
return svm_queue_add (mq->q, (u8 *) & msg, nowait);
ASSERT (svm_msq_q_msg_is_valid (mq, msg));
svm_queue_add_raw (mq->q, (u8 *) msg);
}

int
Expand All @@ -153,6 +213,12 @@ svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
}

void
svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
svm_queue_sub_raw (mq->q, (u8 *) msg);
}

/*
* fd.io coding-style-patch-verification: ON
*
Expand Down
157 changes: 153 additions & 4 deletions src/svm/message_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
#define SRC_SVM_MESSAGE_QUEUE_H_

#include <vppinfra/clib.h>
#include <vppinfra/error.h>
#include <svm/queue.h>

typedef struct svm_msg_q_ring_
{
volatile u32 cursize; /**< current size of the ring */
u32 nitems; /**< max size of the ring */
u32 head; /**< current head (for dequeue) */
u32 tail; /**< current tail (for enqueue) */
volatile u32 head; /**< current head (for dequeue) */
volatile u32 tail; /**< current tail (for enqueue) */
u32 elsize; /**< size of an element */
u8 *data; /**< chunk of memory for msg data */
} svm_msg_q_ring_t;
Expand Down Expand Up @@ -64,6 +65,7 @@ typedef union
u64 as_u64;
} svm_msg_q_msg_t;

#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
/**
* Allocate message queue
*
Expand Down Expand Up @@ -97,6 +99,36 @@ void svm_msg_q_free (svm_msg_q_t * mq);
*/
svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);

/**
* Allocate message buffer on ring
*
* Message is allocated, on requested ring. The caller MUST check that
* the ring is not full.
*
* @param mq message queue
* @param ring_index ring on which the allocation should occur
* @return message structure pointing to the ring and position
* allocated
*/
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);

/**
* Lock message queue and allocate message buffer on ring
*
* This should be used when multiple writers/readers are expected to
* compete for the rings/queue. Message should be enqueued by calling
* @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
* the message in enqueued.
*
* @param mq message queue
* @param ring_index ring on which the allocation should occur
* @param noblock flag that indicates if request should block
* @param msg pointer to message to be filled in
* @return 0 on success, negative number otherwise
*/
int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
u8 noblock, svm_msg_q_msg_t * msg);

/**
* Free message buffer
*
Expand All @@ -106,6 +138,7 @@ svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
* @param msg message to be freed
*/
void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);

/**
* Producer enqueue one message to queue
*
Expand All @@ -117,7 +150,20 @@ void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
* @param nowait flag to indicate if request is blocking or not
* @return success status
*/
int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);

/**
* Producer enqueue one message to queue with mutex held
*
* Prior to calling this, the producer should've obtained a message buffer
* from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
* the queue mutex is held.
*
* @param mq message queue
* @param msg message (pointer to ring position) to be enqueued
* @return success status
*/
void svm_msg_q_add_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);

/**
* Consumer dequeue one message from queue
Expand All @@ -129,20 +175,123 @@ int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t msg, int nowait);
* @param mq message queue
* @param msg pointer to structure where message is to be received
* @param cond flag that indicates if request should block or not
* @param time time to wait if condition it SVM_Q_TIMEDWAIT
* @return success status
*/
int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
svm_q_conditional_wait_t cond, u32 time);

/**
* Get data for message in queu
* Consumer dequeue one message from queue with mutex held
*
* Returns the message pointing to the data in the message rings under the
* assumption that the message queue lock is already held. The consumer is
* expected to call @ref svm_msg_q_free_msg once it finishes
* processing/copies the message data.
*
* @param mq message queue
* @param msg pointer to structure where message is to be received
* @return success status
*/
void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);

/**
* Get data for message in queue
*
* @param mq message queue
* @param msg message for which the data is requested
* @return pointer to data
*/
void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);

/**
* Get message queue ring
*
* @param mq message queue
* @param ring_index index of ring
* @return pointer to ring
*/
svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);

/**
* Check if message queue is full
*/
static inline u8
svm_msg_q_is_full (svm_msg_q_t * mq)
{
return (mq->q->cursize == mq->q->maxsize);
}

static inline u8
svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
{
ASSERT (ring_index < vec_len (mq->rings));
return (mq->rings[ring_index].cursize == mq->rings[ring_index].nitems);
}

/**
* Check if message queue is empty
*/
static inline u8
svm_msg_q_is_empty (svm_msg_q_t * mq)
{
return (mq->q->cursize == 0);
}

/**
* Check length of message queue
*/
static inline u32
svm_msg_q_size (svm_msg_q_t * mq)
{
return mq->q->cursize;
}

/**
* Check if message is invalid
*/
static inline u8
svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
{
return (msg->as_u64 == (u64) ~ 0);
}

/**
* Try locking message queue
*/
static inline int
svm_msg_q_try_lock (svm_msg_q_t * mq)
{
return pthread_mutex_trylock (&mq->q->mutex);
}

/**
* Lock, or block trying, the message queue
*/
static inline int
svm_msg_q_lock (svm_msg_q_t * mq)
{
return pthread_mutex_lock (&mq->q->mutex);
}

static inline void
svm_msg_q_wait (svm_msg_q_t * mq)
{
pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
}

/**
* Unlock message queue
*/
static inline void
svm_msg_q_unlock (svm_msg_q_t * mq)
{
/* The other side of the connection is not polling */
if (mq->q->cursize < (mq->q->maxsize / 8))
(void) pthread_cond_broadcast (&mq->q->condvar);
pthread_mutex_unlock (&mq->q->mutex);
}

#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */

/*
Expand Down
Loading

1 comment on commit 3c2fed5

@fjccc
Copy link

@fjccc fjccc commented on 3c2fed5 Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I recently discovered that using app_send_stream_raw and app_send_dgram_raw can lead to deadlock issues. Are these functions not supposed to be used in different threads? Here is the specific stack information:
Thread 3 (Thread 0x7fd055e98700 (LWP 43427)):
#0 0x00007fd15d23c5dc in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007fd15e90e9a1 in svm_msg_q_wait_prod (mq=) at /usr/src/debug/vpp-21.06/src/svm/message_queue.c:549
#2 svm_msg_q_wait_prod (mq=mq@entry=0x7fd05d462f30) at /usr/src/debug/vpp-21.06/src/svm/message_queue.c:544
#3 0x00007fd05c688938 in app_send_io_evt_to_vpp (noblock=0 '\000', evt_type=, session_index=827, mq=0x7fd05d462f30) at /usr/src/debug/vpp-21.06/src/vnet/session/application_interface.h:656
#4 app_send_stream_raw (noblock=0 '\000', do_evt=1 '\001', evt_type=, len=, data=, vpp_evt_q=0x7fd05d462f30, f=0x7fd0626f4340) at /usr/src/debug/vpp-21.06/src/vnet/session/application_interface.h:761

Thread 1 (Thread 0x7fd15e8ee7c0 (LWP 43424)):
#0 0x00007fd15d23c5dc in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
--Type for more, q to quit, c to continue without paging--
#1 0x00007fd15e90e9a1 in svm_msg_q_wait_prod (mq=) at /usr/src/debug/vpp-21.06/src/svm/message_queue.c:549
#2 svm_msg_q_wait_prod (mq=mq@entry=0x7fd05d462f30) at /usr/src/debug/vpp-21.06/src/svm/message_queue.c:544
#3 0x00007fd05c696c50 in app_send_io_evt_to_vpp (evt_type=1 '\001', noblock=0 '\000', session_index=18596, mq=0x7fd05d462f30) at /usr/src/debug/vpp-21.06/src/vnet/session/application_interface.h:656
#4 app_send_dgram_raw (noblock=0 '\000', do_evt=1 '\001', evt_type=1 '\001', len=57, data=, vpp_evt_q=0x7fd05d462f30, at=, f=) at /usr/src/debug/vpp-21.06/src/vnet/session/application_interface.h:737

Please sign in to comment.