Skip to content

Commit

Permalink
Fixed missing callbacks for pending events
Browse files Browse the repository at this point in the history
When setting a listener, any pending (i.e. unhandled) event received before would not result in an immediate callback.
Now the set_listener call also checks for pending events, and invokes the registered callbacks when appropriate.
This fix is a prerequisite for #eclipse-cyclonedds/cyclonedds-cxx#410

Signed-off-by: Erik Hendriks <[email protected]>
  • Loading branch information
e-hndrks committed Jun 15, 2023
1 parent 0a2ff5e commit 28092e8
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 137 deletions.
3 changes: 3 additions & 0 deletions src/core/ddsc/src/dds__reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ struct ddsi_status_cb_data;
/** @component reader */
void dds_reader_status_cb (void *entity, const struct ddsi_status_cb_data * data);

/** @component reader */
void dds_reader_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status);

/** @component reader */
dds_return_t dds_return_reader_loan (dds_reader *rd, void **buf, int32_t bufsz);

Expand Down
9 changes: 9 additions & 0 deletions src/core/ddsc/src/dds__types.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ typedef struct dds_entity_deriver {
dds_return_t (*validate_status) (uint32_t mask);
struct dds_statistics * (*create_statistics) (const struct dds_entity *e);
void (*refresh_statistics) (const struct dds_entity *e, struct dds_statistics *s);
void (*invoke_cbs_for_pending_events) (struct dds_entity *e, uint32_t status);
} dds_entity_deriver;

struct dds_waitset;
Expand Down Expand Up @@ -207,6 +208,9 @@ struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct
/** @notincomponent */
void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);

/** @notincomponent */
void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status);

/** @component generic_entity */
inline void dds_entity_deriver_interrupt (struct dds_entity *e) {
(dds_entity_deriver_table[e->m_kind]->interrupt) (e);
Expand Down Expand Up @@ -252,6 +256,11 @@ inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, s
dds_entity_deriver_table[e->m_kind]->refresh_statistics (e, s);
}

/** @component entity_listener */
inline void dds_entity_deriver_invoke_cbs_for_pending_events (struct dds_entity *e, uint32_t mask) {
dds_entity_deriver_table[e->m_kind]->invoke_cbs_for_pending_events (e, mask);
}

typedef struct dds_cyclonedds_entity {
struct dds_entity m_entity;

Expand Down
3 changes: 3 additions & 0 deletions src/core/ddsc/src/dds__writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct ddsi_status_cb_data;
/** @component writer */
void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data * data);

/** @component writer */
void dds_writer_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status);

/** @component writer */
dds_return_t dds_return_writer_loan(dds_writer *writer, void **buf, int32_t bufsz) ddsrt_nonnull_all;

Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_domain = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

static int dds_domain_compare (const void *va, const void *vb)
Expand Down
28 changes: 28 additions & 0 deletions src/core/ddsc/src/dds_entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, st
(void) e; (void) s;
}

void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status) {
(void) e; (void) status;
}

extern inline void dds_entity_deriver_interrupt (struct dds_entity *e);
extern inline void dds_entity_deriver_close (struct dds_entity *e);
extern inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e);
Expand All @@ -82,6 +86,7 @@ extern inline bool dds_entity_supports_set_qos (struct dds_entity *e);
extern inline bool dds_entity_supports_validate_status (struct dds_entity *e);
extern inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e);
extern inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);
extern inline void dds_entity_deriver_invoke_cbs_for_pending_events (struct dds_entity *e, uint32_t status);

static int compare_instance_handle (const void *va, const void *vb)
{
Expand Down Expand Up @@ -1007,6 +1012,12 @@ static void pushdown_listener (dds_entity *e)

ddsrt_mutex_lock (&e->m_observers_lock);
dds_override_inherited_listener (&c->m_listener, &e->m_listener);
if (ddsrt_avl_is_empty(&c->m_children)) {
uint32_t status = ddsrt_atomic_ld32 (&c->m_status.m_status_and_mask) & SAM_STATUS_MASK;
if (status) {
dds_entity_deriver_invoke_cbs_for_pending_events(c, status);
}
}
ddsrt_mutex_unlock (&e->m_observers_lock);

ddsrt_mutex_unlock (&c->m_observers_lock);
Expand All @@ -1020,10 +1031,12 @@ static void pushdown_listener (dds_entity *e)
ddsrt_mutex_unlock (&e->m_mutex);
}


dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listener)
{
dds_entity *e, *x;
dds_return_t rc;
uint32_t status;

if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
Expand Down Expand Up @@ -1054,7 +1067,22 @@ dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listen

ddsrt_mutex_unlock (&e->m_observers_lock);
pushdown_listener (e);
/* Check for pending events, and when needed notify their listeners. */
ddsrt_mutex_lock (&e->m_observers_lock);
e->m_cb_pending_count++;
while (e->m_cb_count > 0)
ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock);
e->m_cb_count++;
status = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask) & SAM_STATUS_MASK;
if (listener && status) {
dds_entity_deriver_invoke_cbs_for_pending_events(e, status);
}
e->m_cb_count--;
e->m_cb_pending_count--;
ddsrt_cond_broadcast (&e->m_observers_cond);
ddsrt_mutex_unlock (&e->m_observers_lock);
dds_entity_unpin (e);

return DDS_RETCODE_OK;
}

Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_guardcond.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const struct dds_entity_deriver dds_entity_deriver_guardcondition = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_guardcondition (dds_entity_t owner)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_cyclonedds_entity dds_global;
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_participant.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ const struct dds_entity_deriver dds_entity_deriver_participant = {
.set_qos = dds_participant_qos_set,
.validate_status = dds_participant_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = {
.set_qos = dds_publisher_qos_set,
.validate_status = dds_publisher_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_readcond.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const struct dds_entity_deriver dds_entity_deriver_readcondition = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_readcond *dds_create_readcond_impl (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter)
Expand Down
87 changes: 62 additions & 25 deletions src/core/ddsc/src/dds_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ static void data_avail_cb_leave_listener_exclusive_access (dds_entity *e)
ddsrt_cond_broadcast (&e->m_observers_cond);
}

static void data_avail_cb_invoke_dor (dds_entity *sub, const struct dds_listener *lst)
static void data_avail_cb_invoke_dor (dds_entity *sub, const struct dds_listener *lst, bool async)
{
// assumes sub->m_observers_lock held on entry
// unlocks and relocks sub->m_observers_lock
data_avail_cb_enter_listener_exclusive_access (sub);
if (async) data_avail_cb_enter_listener_exclusive_access (sub);
ddsrt_mutex_unlock (&sub->m_observers_lock);
lst->on_data_on_readers (sub->m_hdllink.hdl, lst->on_data_on_readers_arg);
ddsrt_mutex_lock (&sub->m_observers_lock);
data_avail_cb_leave_listener_exclusive_access (sub);
if (async) data_avail_cb_leave_listener_exclusive_access (sub);
}

static uint32_t data_avail_cb_set_status (dds_entity *rd, uint32_t status_and_mask)
Expand Down Expand Up @@ -217,15 +217,41 @@ static void data_avail_cb_trigger_waitsets (dds_entity *rd, uint32_t signal)
}
}

static uint32_t da_or_dor_cb_invoke(struct dds_reader *rd, struct dds_listener const * const lst, uint32_t status_and_mask, bool async)
{
uint32_t signal = 0;

if (lst->on_data_on_readers)
{
dds_entity * const sub = rd->m_entity.m_parent;
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
ddsrt_mutex_lock (&sub->m_observers_lock);
if (!(lst->reset_on_invoke & DDS_DATA_ON_READERS_STATUS))
signal = data_avail_cb_set_status (&rd->m_entity, status_and_mask);
data_avail_cb_invoke_dor (sub, lst, async);
ddsrt_mutex_unlock (&sub->m_observers_lock);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
}
else if(rd->m_entity.m_listener.on_data_available)
{
if (!(lst->reset_on_invoke & DDS_DATA_AVAILABLE_STATUS))
signal = data_avail_cb_set_status (&rd->m_entity, status_and_mask);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
lst->on_data_available (rd->m_entity.m_hdllink.hdl, lst->on_data_available_arg);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
}
return signal;
}

void dds_reader_data_available_cb (struct dds_reader *rd)
{
/* DATA_AVAILABLE is special in two ways: firstly, it should first try
DATA_ON_READERS on the line of ancestors, and if not consumed set the
status on the subscriber; secondly it is the only one for which
overhead really matters. Otherwise, it is pretty much like
dds_reader_status_cb. */
uint32_t signal;
struct dds_listener const * const lst = &rd->m_entity.m_listener;
uint32_t signal = 0;

ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
const uint32_t status_and_mask = ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask);
Expand All @@ -235,26 +261,7 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
{
// "lock" listener object so we can look at "lst" without holding m_observers_lock
data_avail_cb_enter_listener_exclusive_access (&rd->m_entity);
if (lst->on_data_on_readers)
{
dds_entity * const sub = rd->m_entity.m_parent;
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
ddsrt_mutex_lock (&sub->m_observers_lock);
if (!(lst->reset_on_invoke & DDS_DATA_ON_READERS_STATUS))
signal = data_avail_cb_set_status (&rd->m_entity, status_and_mask);
data_avail_cb_invoke_dor (sub, lst);
ddsrt_mutex_unlock (&sub->m_observers_lock);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
}
else
{
assert (rd->m_entity.m_listener.on_data_available);
if (!(lst->reset_on_invoke & DDS_DATA_AVAILABLE_STATUS))
signal = data_avail_cb_set_status (&rd->m_entity, status_and_mask);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
lst->on_data_available (rd->m_entity.m_hdllink.hdl, lst->on_data_available_arg);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
}
signal = da_or_dor_cb_invoke(rd, lst, status_and_mask, true);
data_avail_cb_leave_listener_exclusive_access (&rd->m_entity);
}
data_avail_cb_trigger_waitsets (&rd->m_entity, signal);
Expand Down Expand Up @@ -440,6 +447,35 @@ void dds_reader_status_cb (void *ventity, const ddsi_status_cb_data_t *data)
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}

void dds_reader_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status)
{
dds_reader * const rdr = (dds_reader *) e;
struct dds_listener const * const lst = &e->m_listener;

if (lst->on_requested_deadline_missed && (status & DDS_REQUESTED_DEADLINE_MISSED_STATUS)) {
status_cb_requested_deadline_missed_invoke(rdr);
}
if (lst->on_requested_incompatible_qos && (status & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS)) {
status_cb_requested_incompatible_qos_invoke(rdr);
}
if (lst->on_sample_lost && (status & DDS_SAMPLE_LOST_STATUS)) {
status_cb_sample_lost_invoke(rdr);
}
if (lst->on_sample_rejected && (status & DDS_SAMPLE_REJECTED_STATUS)) {
status_cb_sample_rejected_invoke(rdr);
}
if (lst->on_liveliness_changed && (status & DDS_LIVELINESS_CHANGED_STATUS)) {
status_cb_liveliness_changed_invoke(rdr);
}
if (lst->on_subscription_matched && (status & DDS_SUBSCRIPTION_MATCHED_STATUS)) {
status_cb_subscription_matched_invoke(rdr);
}
if ((status & DDS_DATA_AVAILABLE_STATUS)) {
const uint32_t status_and_mask = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask);
(void) da_or_dor_cb_invoke(rdr, lst, status_and_mask, false);
}
}

static const struct dds_stat_keyvalue_descriptor dds_reader_statistics_kv[] = {
{ "discarded_bytes", DDS_STAT_KIND_UINT64 }
};
Expand Down Expand Up @@ -468,7 +504,8 @@ const struct dds_entity_deriver dds_entity_deriver_reader = {
.set_qos = dds_reader_qos_set,
.validate_status = dds_reader_status_validate,
.create_statistics = dds_reader_create_statistics,
.refresh_statistics = dds_reader_refresh_statistics
.refresh_statistics = dds_reader_refresh_statistics,
.invoke_cbs_for_pending_events = dds_reader_invoke_cbs_for_pending_events
};

#ifdef DDS_HAS_SHM
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = {
.set_qos = dds_subscriber_qos_set,
.validate_status = dds_subscriber_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ const struct dds_entity_deriver dds_entity_deriver_topic = {
.set_qos = dds_topic_qos_set,
.validate_status = dds_topic_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

/**
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_waitset.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ const struct dds_entity_deriver dds_entity_deriver_waitset = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_waitset (dds_entity_t owner)
Expand Down
22 changes: 21 additions & 1 deletion src/core/ddsc/src/dds_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data *data)
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
}

void dds_writer_invoke_cbs_for_pending_events(struct dds_entity *e, uint32_t status)
{
dds_writer * const wr = (dds_writer *) e;
struct dds_listener const * const lst = &e->m_listener;

if (lst->on_publication_matched && (status & DDS_PUBLICATION_MATCHED_STATUS)) {
status_cb_publication_matched_invoke(wr);
}
if (lst->on_liveliness_lost && (status & DDS_LIVELINESS_LOST_STATUS)) {
status_cb_liveliness_lost_invoke(wr);
}
if (lst->on_offered_incompatible_qos && (status & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS)) {
status_cb_offered_incompatible_qos_invoke(wr);
}
if (lst->on_offered_deadline_missed && (status & DDS_OFFERED_DEADLINE_MISSED_STATUS)) {
status_cb_offered_deadline_missed_invoke(wr);
}
}

static void dds_writer_interrupt (dds_entity *e) ddsrt_nonnull_all;

static void dds_writer_interrupt (dds_entity *e)
Expand Down Expand Up @@ -275,7 +294,8 @@ const struct dds_entity_deriver dds_entity_deriver_writer = {
.set_qos = dds_writer_qos_set,
.validate_status = dds_writer_status_validate,
.create_statistics = dds_writer_create_statistics,
.refresh_statistics = dds_writer_refresh_statistics
.refresh_statistics = dds_writer_refresh_statistics,
.invoke_cbs_for_pending_events = dds_writer_invoke_cbs_for_pending_events
};

#ifdef DDS_HAS_SHM
Expand Down
Loading

0 comments on commit 28092e8

Please sign in to comment.