Skip to content

Commit

Permalink
Merge branch 'feature/timer-heap' into staging
Browse files Browse the repository at this point in the history
* feature/timer-heap:
  - associate timer heap with transport instance (vs. a single global instance)
  • Loading branch information
bill-torpey committed Nov 9, 2021
2 parents 2efb647 + f96ea33 commit 33c9f4d
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
31 changes: 21 additions & 10 deletions src/bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
// local includes
#include "zmqbridgefunctions.h"
#include <mama/integration/mama.h>
#include "zmqdefs.h"
#include "util.h"

#include <zmq.h>
Expand All @@ -44,9 +45,6 @@
= Typedefs, structs, enums and globals =
=========================================================================*/

/* Global timer heap */
timerHeap gOmzmqTimerHeap;

/* Default payload names and IDs to be loaded when this bridge is loaded */
static char* PAYLOAD_NAMES[] = { "omnmmsg", NULL };
static char PAYLOAD_IDS[] = { 'O', '\0' };
Expand Down Expand Up @@ -91,6 +89,16 @@ zmqBridge_open(mamaBridge bridgeImpl)
return MAMA_STATUS_NULL_ARG;
}

/* Create the bridge impl container */
zmqBridgeClosure* closure = (zmqBridgeClosure*) calloc(1, sizeof(zmqBridgeClosure));
if (NULL == closure)
{
mama_log (MAMA_LOG_LEVEL_ERROR,
"baseBridge_open(): Could not allocate bridge structure.");
return MAMA_STATUS_NOMEM;
}
mamaBridgeImpl_setClosure(bridgeImpl, closure);

/* Create the default event queue */
status = mamaQueue_create(&defaultEventQueue, bridgeImpl);
if (MAMA_STATUS_OK != status) {
Expand All @@ -104,13 +112,13 @@ zmqBridge_open(mamaBridge bridgeImpl)
mamaQueue_setQueueName(defaultEventQueue, ZMQ_DEFAULT_QUEUE_NAME);

/* Create the timer heap */
if (0 != createTimerHeap(&gOmzmqTimerHeap)) {
if (0 != createTimerHeap (&closure->mTimerHeap)) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to initialize timers.");
return MAMA_STATUS_PLATFORM;
}

/* Start the dispatch timer heap which will create a new thread */
if (0 != startDispatchTimerHeap(gOmzmqTimerHeap)) {
if (0 != startDispatchTimerHeap(closure->mTimerHeap)) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to start timer thread.");
return MAMA_STATUS_PLATFORM;
}
Expand All @@ -128,18 +136,21 @@ zmqBridge_close(mamaBridge bridgeImpl)
return MAMA_STATUS_NULL_ARG;
}

zmqBridgeClosure* closure = NULL;
mamaBridgeImpl_getClosure(bridgeImpl, (void**)&closure);
wthread_t timerThread;
/* Remove the timer heap */
if (NULL != gOmzmqTimerHeap) {
/* The timer heap allows us to access it's thread ID for joining */
wthread_t timerThread = timerHeapGetTid(gOmzmqTimerHeap);
if (0 != destroyHeap(gOmzmqTimerHeap)) {
if (NULL != closure->mTimerHeap) {
/* The timer heap allows us to access it's thread ID for joining */
timerThread = timerHeapGetTid (closure->mTimerHeap);
if (0 != destroyHeap (closure->mTimerHeap)) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to destroy zmq timer heap.");
status = MAMA_STATUS_PLATFORM;
}
/* The timer thread expects us to be responsible for terminating it */
wthread_join(timerThread, NULL);
}
gOmzmqTimerHeap = NULL;
mamaBridgeImpl_setClosure(bridgeImpl, NULL);

/* Destroy once queue has been emptied */
mama_getDefaultEventQueue(bridgeImpl, &defaultEventQueue);
Expand Down
46 changes: 36 additions & 10 deletions src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
= Typedefs, structs, enums and globals =
=========================================================================*/

extern timerHeap gOmzmqTimerHeap;

typedef struct zmqTimerImpl_ {
timerElement mTimerElement;
double mInterval;
Expand Down Expand Up @@ -103,6 +101,7 @@ static void zmqBridgeMamaTimerImpl_timerCallback(timerElement timer, void* closu
mama_status zmqBridgeMamaTimerImpl_reset(zmqTimerImpl* impl, mama_f64_t interval);


zmqBridgeClosure* zmqBridgeMamaTimerImpl_getBridgeClosure (zmqTimerImpl* impl);

/*=========================================================================
= Public interface implementation functions =
Expand Down Expand Up @@ -141,8 +140,11 @@ mama_status zmqBridgeMamaTimer_create(timerBridge* result, void* nativeQueueHand
timeout.tv_sec = (time_t) interval;
timeout.tv_usec = ((interval - timeout.tv_sec) * 1000000.0); // how is this ever not zero?

/* Get the timer heap from the bridge */
zmqBridgeClosure* bridgeClosure = zmqBridgeMamaTimerImpl_getBridgeClosure (impl);

/* Create the first single fire timer */
int timerResult = createTimer(&impl->mTimerElement, gOmzmqTimerHeap, zmqBridgeMamaTimerImpl_timerCallback, &timeout, impl);
int timerResult = createTimer(&impl->mTimerElement, bridgeClosure->mTimerHeap, zmqBridgeMamaTimerImpl_timerCallback, &timeout, impl);
if (0 != timerResult) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to create underlying timer [%d].", timerResult);
return MAMA_STATUS_PLATFORM;
Expand All @@ -166,18 +168,21 @@ mama_status zmqBridgeMamaTimer_destroy(timerBridge timer)
wInterlocked_set(1, &impl->mDestroying);
impl->mAction = NULL;

/* Get the timer heap from the bridge */
zmqBridgeClosure* bridgeClosure = zmqBridgeMamaTimerImpl_getBridgeClosure (impl);

// destroy must be syncrhonized w/reset
lockTimerHeap(gOmzmqTimerHeap);
lockTimerHeap(bridgeClosure->mTimerHeap);

/* Destroy the timer element */
int timerResult = destroyTimer(gOmzmqTimerHeap, impl->mTimerElement);
int timerResult = destroyTimer(bridgeClosure->mTimerHeap, impl->mTimerElement);
if (0 != timerResult) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to destroy underlying timer [%d].", timerResult);
returnStatus = MAMA_STATUS_PLATFORM;
}
impl->mTimerElement = NULL;

unlockTimerHeap(gOmzmqTimerHeap);
unlockTimerHeap(bridgeClosure->mTimerHeap);

// There may be timer events already queued, so we cannot destroy the timer until these have fired.
zmqBridgeMamaQueue_enqueueEvent((queueBridge) impl->mQueue, zmqBridgeMamaTimerImpl_destroyCallback, (void*) impl);
Expand Down Expand Up @@ -288,14 +293,17 @@ mama_status zmqBridgeMamaTimerImpl_reset(zmqTimerImpl* impl, mama_f64_t interval

mama_status status = MAMA_STATUS_OK;

/* Get the timer heap from the bridge */
zmqBridgeClosure* bridgeClosure = zmqBridgeMamaTimerImpl_getBridgeClosure (impl);

// destroyTimer/createTimer must be executed atomically!
lockTimerHeap(gOmzmqTimerHeap);
lockTimerHeap(bridgeClosure->mTimerHeap);

impl->mInterval = interval;

/* Destroy the existing timer element */
if (impl->mTimerElement != NULL) {
destroyTimer(gOmzmqTimerHeap, impl->mTimerElement);
destroyTimer(bridgeClosure->mTimerHeap, impl->mTimerElement);
}

/* Calculate next time interval */
Expand All @@ -304,14 +312,32 @@ mama_status zmqBridgeMamaTimerImpl_reset(zmqTimerImpl* impl, mama_f64_t interval
timeout.tv_usec = ((impl->mInterval - timeout.tv_sec) * 1000000.0);

/* Create the timer for the next firing */
int timerResult = createTimer(&impl->mTimerElement, gOmzmqTimerHeap, zmqBridgeMamaTimerImpl_timerCallback, &timeout, impl);
int timerResult = createTimer(&impl->mTimerElement, bridgeClosure->mTimerHeap, zmqBridgeMamaTimerImpl_timerCallback, &timeout, impl);
if (0 != timerResult) {
MAMA_LOG(MAMA_LOG_LEVEL_ERROR, "Failed to reset underlying timer [%d].", timerResult);
status = MAMA_STATUS_PLATFORM;
}

unlockTimerHeap(gOmzmqTimerHeap);
unlockTimerHeap(bridgeClosure->mTimerHeap);

return status;
}

zmqBridgeClosure* zmqBridgeMamaTimerImpl_getBridgeClosure (zmqTimerImpl* impl)
{
mamaQueue queue = NULL;
zmqBridgeClosure* bridgeClosure = NULL;
mamaBridge bridgeImpl = NULL;

/* Get the queue from the timer */
mamaTimer_getQueue(impl->mParent, &queue);

/* Get the bridge impl from the queue */
bridgeImpl = mamaQueueImpl_getBridgeImpl (queue);

/* Get the closure from the bridge */
mamaBridgeImpl_getClosure(bridgeImpl, (void**)&bridgeClosure);

return bridgeClosure;

}
9 changes: 9 additions & 0 deletions src/zmqdefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ int log_level_inbox;
#include <wombat/queue.h>
#include <wombat/mempool.h>
#include <mama/integration/types.h>
#include <timers.h>

// required for definition of ZMQ_CLIENT, ZMQ_SERVER
#define ZMQ_BUILD_DRAFT_API
Expand Down Expand Up @@ -213,6 +214,14 @@ typedef struct zmqTransportBridge_ {
} zmqTransportBridge;


typedef struct zmqBridgeClosure_
{
// Note that mClosure is first - contains implementation bridge's own closure
void* mImplClosure;
timerHeap mTimerHeap;
} zmqBridgeClosure;


// defines a subscriber (either "normal" or wildcard)
typedef struct zmqSubscription_ {
mamaMsgCallbacks mMamaCallback;
Expand Down

0 comments on commit 33c9f4d

Please sign in to comment.