Skip to content

Commit

Permalink
Fixed missing callbacks for pending events
Browse files Browse the repository at this point in the history
When setting a listener, any pending (i.e. unhandled) event received before would not result in an immediate callback.
Now the set_listener call also checks for pending events, and invokes the registered callbacks when appropriate.
This fix is a prerequisite for #eclipse-cyclonedds/cyclonedds-cxx#410

Signed-off-by: Erik Hendriks <[email protected]>
  • Loading branch information
e-hndrks committed Jun 2, 2023
1 parent a10ced3 commit 3e85ff9
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/core/ddsc/src/dds__reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ struct ddsi_status_cb_data;
/** @component reader */
void dds_reader_status_cb (void *entity, const struct ddsi_status_cb_data * data);

/** @component reader */
void dds_reader_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status);

/** @component reader */
dds_return_t dds_return_reader_loan (dds_reader *rd, void **buf, int32_t bufsz);

Expand Down
9 changes: 9 additions & 0 deletions src/core/ddsc/src/dds__types.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ typedef struct dds_entity_deriver {
dds_return_t (*validate_status) (uint32_t mask);
struct dds_statistics * (*create_statistics) (const struct dds_entity *e);
void (*refresh_statistics) (const struct dds_entity *e, struct dds_statistics *s);
void (*invoke_cbs_for_pending_events) (const struct dds_entity *e, uint32_t status);
} dds_entity_deriver;

struct dds_waitset;
Expand Down Expand Up @@ -207,6 +208,9 @@ struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct
/** @notincomponent */
void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);

/** @notincomponent */
void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status);

/** @component generic_entity */
inline void dds_entity_deriver_interrupt (struct dds_entity *e) {
(dds_entity_deriver_table[e->m_kind]->interrupt) (e);
Expand Down Expand Up @@ -252,6 +256,11 @@ inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, s
dds_entity_deriver_table[e->m_kind]->refresh_statistics (e, s);
}

/** @component statistics */
inline void dds_entity_deriver_invoke_cbs_for_pending_events (const struct dds_entity *e, uint32_t mask) {
dds_entity_deriver_table[e->m_kind]->invoke_cbs_for_pending_events (e, mask);
}

typedef struct dds_cyclonedds_entity {
struct dds_entity m_entity;

Expand Down
3 changes: 3 additions & 0 deletions src/core/ddsc/src/dds__writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct ddsi_status_cb_data;
/** @component writer */
void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data * data);

/** @component writer */
void dds_writer_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status);

/** @component writer */
dds_return_t dds_return_writer_loan(dds_writer *writer, void **buf, int32_t bufsz) ddsrt_nonnull_all;

Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_domain = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

static int dds_domain_compare (const void *va, const void *vb)
Expand Down
23 changes: 23 additions & 0 deletions src/core/ddsc/src/dds_entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, st
(void) e; (void) s;
}

void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status) {
(void) e; (void) status;
}

extern inline void dds_entity_deriver_interrupt (struct dds_entity *e);
extern inline void dds_entity_deriver_close (struct dds_entity *e);
extern inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e);
Expand All @@ -82,6 +86,7 @@ extern inline bool dds_entity_supports_set_qos (struct dds_entity *e);
extern inline bool dds_entity_supports_validate_status (struct dds_entity *e);
extern inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e);
extern inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);
extern inline void dds_entity_deriver_invoke_cbs_for_pending_events (const struct dds_entity *e, uint32_t status);

static int compare_instance_handle (const void *va, const void *vb)
{
Expand Down Expand Up @@ -1020,10 +1025,12 @@ static void pushdown_listener (dds_entity *e)
ddsrt_mutex_unlock (&e->m_mutex);
}


dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listener)
{
dds_entity *e, *x;
dds_return_t rc;
uint32_t status;

if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
Expand Down Expand Up @@ -1054,7 +1061,23 @@ dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listen

ddsrt_mutex_unlock (&e->m_observers_lock);
pushdown_listener (e);
/* Check for pending events, and when needed notify their listeners. */
ddsrt_mutex_lock (&e->m_observers_lock);
e->m_cb_pending_count++;
while (e->m_cb_count > 0)
ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock);
e->m_cb_count++;
/* TODO: dds_get_status_changes may not be invoked for DATA_ON_READERS on Subscriber, check if that is the case!! */
status = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask) & SAM_STATUS_MASK;
if (status) {
dds_entity_deriver_invoke_cbs_for_pending_events(e, status);
}
e->m_cb_count--;
e->m_cb_pending_count--;
ddsrt_cond_broadcast (&e->m_observers_cond);
ddsrt_mutex_unlock (&e->m_observers_lock);
dds_entity_unpin (e);

return DDS_RETCODE_OK;
}

Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_guardcond.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const struct dds_entity_deriver dds_entity_deriver_guardcondition = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_guardcondition (dds_entity_t owner)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_cyclonedds_entity dds_global;
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_participant.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ const struct dds_entity_deriver dds_entity_deriver_participant = {
.set_qos = dds_participant_qos_set,
.validate_status = dds_participant_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = {
.set_qos = dds_publisher_qos_set,
.validate_status = dds_publisher_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_readcond.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ const struct dds_entity_deriver dds_entity_deriver_readcondition = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_readcond *dds_create_readcond_impl (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter)
Expand Down
27 changes: 26 additions & 1 deletion src/core/ddsc/src/dds_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,30 @@ void dds_reader_status_cb (void *ventity, const ddsi_status_cb_data_t *data)
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}

void dds_reader_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status)
{
dds_reader * const rdr = (dds_reader *) e;

if (status & DDS_REQUESTED_DEADLINE_MISSED_STATUS) {
status_cb_requested_deadline_missed_invoke(rdr);
}
if (status & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) {
status_cb_requested_incompatible_qos_invoke(rdr);
}
if (status & DDS_SAMPLE_LOST_STATUS) {
status_cb_sample_lost_invoke(rdr);
}
if (status & DDS_SAMPLE_REJECTED_STATUS) {
status_cb_sample_rejected_invoke(rdr);
}
if (status & DDS_LIVELINESS_CHANGED_STATUS) {
status_cb_liveliness_changed_invoke(rdr);
}
if (status & DDS_SUBSCRIPTION_MATCHED_STATUS) {
status_cb_subscription_matched_invoke(rdr);
}
}

static const struct dds_stat_keyvalue_descriptor dds_reader_statistics_kv[] = {
{ "discarded_bytes", DDS_STAT_KIND_UINT64 }
};
Expand Down Expand Up @@ -468,7 +492,8 @@ const struct dds_entity_deriver dds_entity_deriver_reader = {
.set_qos = dds_reader_qos_set,
.validate_status = dds_reader_status_validate,
.create_statistics = dds_reader_create_statistics,
.refresh_statistics = dds_reader_refresh_statistics
.refresh_statistics = dds_reader_refresh_statistics,
.invoke_cbs_for_pending_events = dds_reader_invoke_cbs_for_pending_events
};

#ifdef DDS_HAS_SHM
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = {
.set_qos = dds_subscriber_qos_set,
.validate_status = dds_subscriber_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ const struct dds_entity_deriver dds_entity_deriver_topic = {
.set_qos = dds_topic_qos_set,
.validate_status = dds_topic_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

/**
Expand Down
3 changes: 2 additions & 1 deletion src/core/ddsc/src/dds_waitset.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ const struct dds_entity_deriver dds_entity_deriver_waitset = {
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics,
.invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events
};

dds_entity_t dds_create_waitset (dds_entity_t owner)
Expand Down
21 changes: 20 additions & 1 deletion src/core/ddsc/src/dds_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,24 @@ void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data *data)
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
}

void dds_writer_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status)
{
dds_writer * const wr = (dds_writer *) e;

if (status & DDS_PUBLICATION_MATCHED_STATUS) {
status_cb_publication_matched_invoke(wr);
}
if (status & DDS_LIVELINESS_LOST_STATUS) {
status_cb_liveliness_lost_invoke(wr);
}
if (status & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) {
status_cb_offered_incompatible_qos_invoke(wr);
}
if (status & DDS_OFFERED_DEADLINE_MISSED_STATUS) {
status_cb_offered_deadline_missed_invoke(wr);
}
}

static void dds_writer_interrupt (dds_entity *e) ddsrt_nonnull_all;

static void dds_writer_interrupt (dds_entity *e)
Expand Down Expand Up @@ -275,7 +293,8 @@ const struct dds_entity_deriver dds_entity_deriver_writer = {
.set_qos = dds_writer_qos_set,
.validate_status = dds_writer_status_validate,
.create_statistics = dds_writer_create_statistics,
.refresh_statistics = dds_writer_refresh_statistics
.refresh_statistics = dds_writer_refresh_statistics,
.invoke_cbs_for_pending_events = dds_writer_invoke_cbs_for_pending_events
};

#ifdef DDS_HAS_SHM
Expand Down

0 comments on commit 3e85ff9

Please sign in to comment.