From 3e85ff9ddeaa7ff407fec8266073e7b03a90530e Mon Sep 17 00:00:00 2001 From: Erik Hendriks Date: Fri, 2 Jun 2023 19:40:21 +0200 Subject: [PATCH] Fixed missing callbacks for pending events When setting a listener, any pending (i.e. unhandled) event received before would not result in an immediate callback. Now the set_listener call also checks for pending events, and invokes the registered callbacks when appropriate. This fix is a prerequisite for #https://github.com/eclipse-cyclonedds/cyclonedds-cxx/issues/410 Signed-off-by: Erik Hendriks --- src/core/ddsc/src/dds__reader.h | 3 +++ src/core/ddsc/src/dds__types.h | 9 +++++++++ src/core/ddsc/src/dds__writer.h | 3 +++ src/core/ddsc/src/dds_domain.c | 3 ++- src/core/ddsc/src/dds_entity.c | 23 +++++++++++++++++++++++ src/core/ddsc/src/dds_guardcond.c | 3 ++- src/core/ddsc/src/dds_init.c | 3 ++- src/core/ddsc/src/dds_participant.c | 3 ++- src/core/ddsc/src/dds_publisher.c | 3 ++- src/core/ddsc/src/dds_readcond.c | 3 ++- src/core/ddsc/src/dds_reader.c | 27 ++++++++++++++++++++++++++- src/core/ddsc/src/dds_subscriber.c | 3 ++- src/core/ddsc/src/dds_topic.c | 3 ++- src/core/ddsc/src/dds_waitset.c | 3 ++- src/core/ddsc/src/dds_writer.c | 21 ++++++++++++++++++++- 15 files changed, 102 insertions(+), 11 deletions(-) diff --git a/src/core/ddsc/src/dds__reader.h b/src/core/ddsc/src/dds__reader.h index 575786b559..77cc2b2b9c 100644 --- a/src/core/ddsc/src/dds__reader.h +++ b/src/core/ddsc/src/dds__reader.h @@ -22,6 +22,9 @@ struct ddsi_status_cb_data; /** @component reader */ void dds_reader_status_cb (void *entity, const struct ddsi_status_cb_data * data); +/** @component reader */ +void dds_reader_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status); + /** @component reader */ dds_return_t dds_return_reader_loan (dds_reader *rd, void **buf, int32_t bufsz); diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 607153a76e..21ab67bdc5 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -119,6 +119,7 @@ typedef struct dds_entity_deriver { dds_return_t (*validate_status) (uint32_t mask); struct dds_statistics * (*create_statistics) (const struct dds_entity *e); void (*refresh_statistics) (const struct dds_entity *e, struct dds_statistics *s); + void (*invoke_cbs_for_pending_events) (const struct dds_entity *e, uint32_t status); } dds_entity_deriver; struct dds_waitset; @@ -207,6 +208,9 @@ struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct /** @notincomponent */ void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s); +/** @notincomponent */ +void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status); + /** @component generic_entity */ inline void dds_entity_deriver_interrupt (struct dds_entity *e) { (dds_entity_deriver_table[e->m_kind]->interrupt) (e); @@ -252,6 +256,11 @@ inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, s dds_entity_deriver_table[e->m_kind]->refresh_statistics (e, s); } +/** @component statistics */ +inline void dds_entity_deriver_invoke_cbs_for_pending_events (const struct dds_entity *e, uint32_t mask) { + dds_entity_deriver_table[e->m_kind]->invoke_cbs_for_pending_events (e, mask); +} + typedef struct dds_cyclonedds_entity { struct dds_entity m_entity; diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index a93d826967..8862cb890c 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -24,6 +24,9 @@ struct ddsi_status_cb_data; /** @component writer */ void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data * data); +/** @component writer */ +void dds_writer_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status); + /** @component writer */ dds_return_t dds_return_writer_loan(dds_writer *writer, void **buf, int32_t bufsz) ddsrt_nonnull_all; diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index 4d0df9cf4e..41964803d6 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_domain = { .set_qos = dds_entity_deriver_dummy_set_qos, .validate_status = dds_entity_deriver_dummy_validate_status, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; static int dds_domain_compare (const void *va, const void *vb) diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 91f0150d5a..0d76213dfd 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -73,6 +73,10 @@ void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, st (void) e; (void) s; } +void dds_entity_deriver_dummy_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status) { + (void) e; (void) status; +} + extern inline void dds_entity_deriver_interrupt (struct dds_entity *e); extern inline void dds_entity_deriver_close (struct dds_entity *e); extern inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e); @@ -82,6 +86,7 @@ extern inline bool dds_entity_supports_set_qos (struct dds_entity *e); extern inline bool dds_entity_supports_validate_status (struct dds_entity *e); extern inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e); extern inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s); +extern inline void dds_entity_deriver_invoke_cbs_for_pending_events (const struct dds_entity *e, uint32_t status); static int compare_instance_handle (const void *va, const void *vb) { @@ -1020,10 +1025,12 @@ static void pushdown_listener (dds_entity *e) ddsrt_mutex_unlock (&e->m_mutex); } + dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listener) { dds_entity *e, *x; dds_return_t rc; + uint32_t status; if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK) return rc; @@ -1054,7 +1061,23 @@ dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listen ddsrt_mutex_unlock (&e->m_observers_lock); pushdown_listener (e); + /* Check for pending events, and when needed notify their listeners. */ + ddsrt_mutex_lock (&e->m_observers_lock); + e->m_cb_pending_count++; + while (e->m_cb_count > 0) + ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock); + e->m_cb_count++; + /* TODO: dds_get_status_changes may not be invoked for DATA_ON_READERS on Subscriber, check if that is the case!! */ + status = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask) & SAM_STATUS_MASK; + if (status) { + dds_entity_deriver_invoke_cbs_for_pending_events(e, status); + } + e->m_cb_count--; + e->m_cb_pending_count--; + ddsrt_cond_broadcast (&e->m_observers_cond); + ddsrt_mutex_unlock (&e->m_observers_lock); dds_entity_unpin (e); + return DDS_RETCODE_OK; } diff --git a/src/core/ddsc/src/dds_guardcond.c b/src/core/ddsc/src/dds_guardcond.c index 46ceb4902f..23119ae6ba 100644 --- a/src/core/ddsc/src/dds_guardcond.c +++ b/src/core/ddsc/src/dds_guardcond.c @@ -28,7 +28,8 @@ const struct dds_entity_deriver dds_entity_deriver_guardcondition = { .set_qos = dds_entity_deriver_dummy_set_qos, .validate_status = dds_entity_deriver_dummy_validate_status, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_entity_t dds_create_guardcondition (dds_entity_t owner) diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index cdcb144bc1..8770a2c33f 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -41,7 +41,8 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = { .set_qos = dds_entity_deriver_dummy_set_qos, .validate_status = dds_entity_deriver_dummy_validate_status, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_cyclonedds_entity dds_global; diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index d44cd21d0a..847d3a62cc 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -87,7 +87,8 @@ const struct dds_entity_deriver dds_entity_deriver_participant = { .set_qos = dds_participant_qos_set, .validate_status = dds_participant_status_validate, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index 8c1e14dc8a..a00fbedd67 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -44,7 +44,8 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = { .set_qos = dds_publisher_qos_set, .validate_status = dds_publisher_status_validate, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_readcond.c b/src/core/ddsc/src/dds_readcond.c index 93e29c1d07..2bf1423193 100644 --- a/src/core/ddsc/src/dds_readcond.c +++ b/src/core/ddsc/src/dds_readcond.c @@ -37,7 +37,8 @@ const struct dds_entity_deriver dds_entity_deriver_readcondition = { .set_qos = dds_entity_deriver_dummy_set_qos, .validate_status = dds_entity_deriver_dummy_validate_status, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_readcond *dds_create_readcond_impl (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter) diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index b22c66294b..44d0ca204d 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -440,6 +440,30 @@ void dds_reader_status_cb (void *ventity, const ddsi_status_cb_data_t *data) ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } +void dds_reader_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status) +{ + dds_reader * const rdr = (dds_reader *) e; + + if (status & DDS_REQUESTED_DEADLINE_MISSED_STATUS) { + status_cb_requested_deadline_missed_invoke(rdr); + } + if (status & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) { + status_cb_requested_incompatible_qos_invoke(rdr); + } + if (status & DDS_SAMPLE_LOST_STATUS) { + status_cb_sample_lost_invoke(rdr); + } + if (status & DDS_SAMPLE_REJECTED_STATUS) { + status_cb_sample_rejected_invoke(rdr); + } + if (status & DDS_LIVELINESS_CHANGED_STATUS) { + status_cb_liveliness_changed_invoke(rdr); + } + if (status & DDS_SUBSCRIPTION_MATCHED_STATUS) { + status_cb_subscription_matched_invoke(rdr); + } +} + static const struct dds_stat_keyvalue_descriptor dds_reader_statistics_kv[] = { { "discarded_bytes", DDS_STAT_KIND_UINT64 } }; @@ -468,7 +492,8 @@ const struct dds_entity_deriver dds_entity_deriver_reader = { .set_qos = dds_reader_qos_set, .validate_status = dds_reader_status_validate, .create_statistics = dds_reader_create_statistics, - .refresh_statistics = dds_reader_refresh_statistics + .refresh_statistics = dds_reader_refresh_statistics, + .invoke_cbs_for_pending_events = dds_reader_invoke_cbs_for_pending_events }; #ifdef DDS_HAS_SHM diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index e2f58b7624..5805312b11 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -43,7 +43,8 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = { .set_qos = dds_subscriber_qos_set, .validate_status = dds_subscriber_status_validate, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 6156eced7c..6e4d0880e3 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -285,7 +285,8 @@ const struct dds_entity_deriver dds_entity_deriver_topic = { .set_qos = dds_topic_qos_set, .validate_status = dds_topic_status_validate, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; /** diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index c35acb457f..e1978d16eb 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -139,7 +139,8 @@ const struct dds_entity_deriver dds_entity_deriver_waitset = { .set_qos = dds_entity_deriver_dummy_set_qos, .validate_status = dds_entity_deriver_dummy_validate_status, .create_statistics = dds_entity_deriver_dummy_create_statistics, - .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics, + .invoke_cbs_for_pending_events = dds_entity_deriver_dummy_invoke_cbs_for_pending_events }; dds_entity_t dds_create_waitset (dds_entity_t owner) diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index ed9a0ba644..5a0f3fc6cd 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -162,6 +162,24 @@ void dds_writer_status_cb (void *entity, const struct ddsi_status_cb_data *data) ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); } +void dds_writer_invoke_cbs_for_pending_events(const struct dds_entity *e, uint32_t status) +{ + dds_writer * const wr = (dds_writer *) e; + + if (status & DDS_PUBLICATION_MATCHED_STATUS) { + status_cb_publication_matched_invoke(wr); + } + if (status & DDS_LIVELINESS_LOST_STATUS) { + status_cb_liveliness_lost_invoke(wr); + } + if (status & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) { + status_cb_offered_incompatible_qos_invoke(wr); + } + if (status & DDS_OFFERED_DEADLINE_MISSED_STATUS) { + status_cb_offered_deadline_missed_invoke(wr); + } +} + static void dds_writer_interrupt (dds_entity *e) ddsrt_nonnull_all; static void dds_writer_interrupt (dds_entity *e) @@ -275,7 +293,8 @@ const struct dds_entity_deriver dds_entity_deriver_writer = { .set_qos = dds_writer_qos_set, .validate_status = dds_writer_status_validate, .create_statistics = dds_writer_create_statistics, - .refresh_statistics = dds_writer_refresh_statistics + .refresh_statistics = dds_writer_refresh_statistics, + .invoke_cbs_for_pending_events = dds_writer_invoke_cbs_for_pending_events }; #ifdef DDS_HAS_SHM