From 6756cd8aba26312cb1cc3f5842f89d187fdbfa44 Mon Sep 17 00:00:00 2001 From: DenisBiryukov91 <155981813+DenisBiryukov91@users.noreply.github.com> Date: Thu, 16 Jan 2025 16:48:25 +0100 Subject: [PATCH] support for default recovery option without last sample miss detection (#373) --- examples/zenohc/z_advanced_sub.cxx | 7 ++- include/zenoh/api/ext/session_ext.hxx | 62 +++++++++++++++++++-------- include/zenoh/api/session.hxx | 8 +--- zenoh-c | 2 +- zenoh-pico | 2 +- 5 files changed, 53 insertions(+), 28 deletions(-) diff --git a/examples/zenohc/z_advanced_sub.cxx b/examples/zenohc/z_advanced_sub.cxx index 11a5a14a..723e1f23 100644 --- a/examples/zenohc/z_advanced_sub.cxx +++ b/examples/zenohc/z_advanced_sub.cxx @@ -48,9 +48,12 @@ int _main(int argc, char **argv) { ext::SessionExt::AdvancedSubscriberOptions opts; opts.history.emplace().detect_late_publishers = true; opts.history->detect_late_publishers = true; - opts.recovery.emplace(); // enable recovery based on received heartbeats from ext::AdvancedPublisher + // enable recovery based on received heartbeats from ext::AdvancedPublisher + opts.recovery.emplace().last_sample_miss_detection = + ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{}; // alternatively recovery can be triggered based on missed sample detection via periodic queries: - // opts.recovery.emplace().periodic_queries_period_ms = 1000; + // opts.recovery.emplace().last_sample_miss_detection = + // ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::PeriodicQueriesOptions{1000}; opts.subscriber_detection = true; auto data_handler = [](const Sample &sample) { diff --git a/include/zenoh/api/ext/session_ext.hxx b/include/zenoh/api/ext/session_ext.hxx index ac406241..0bd2d2bb 100644 --- a/include/zenoh/api/ext/session_ext.hxx +++ b/include/zenoh/api/ext/session_ext.hxx @@ -286,12 +286,14 @@ class SessionExt { /// @brief Create default option settings. static CacheOptions create_default() { return {}; } }; - // @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission. + + /// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission. struct SampleMissDetectionOptions { - // The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample - // detection (if heartbeat-based recovery is enabled). If this value is unset, the subscribers will only be - // notified about missed samples if they opt to send periodic queries. + /// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample + /// detection (if heartbeat-based recovery is enabled). + /// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries. std::optional heartbeat_period_ms = {}; + /// @brief Create default option settings. static SampleMissDetectionOptions create_default() { return {}; } }; @@ -392,14 +394,32 @@ class SessionExt { struct RecoveryOptions { /// @name Fields - /// @brief Period for queries for not yet received Samples. - /// - /// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost. - /// So it is useful for sporadic publications but useless for periodic publications - /// with a period smaller or equal to this period. - /// If unset, the subscriber will be instead notified about missed samples through ``AdvancedPublisher`` - /// heartbeats (if enabled on publisher side). - std::optional periodic_queries_period_ms = {}; + /// @brief Option tag for Heartbeat-based last sample detection. + struct Heartbeat {}; + + /// @brief Settings for periodic queries-based last sample detection. + struct PeriodicQueriesOptions { + /// @name Fields + + /// @brief Period for queries for not yet received Samples. + /// + /// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost. + /// So it is useful for sporadic publications but useless for periodic publications + /// with a period smaller or equal to this period. + uint64_t period_ms = 1000; + + /// @name Methods + + /// @brief Create default option settings. + static PeriodicQueriesOptions create_default() { return {}; }; + }; + + /// @brief Setting for detecting last sample(s) miss. + /// Note that it does not affect intermediate sample miss detection/retrieval (which is performed + /// automatically as long as recovery is enabled). If this option is disabled, subscriber will be unable to + /// detect/request retransmission of missed sample until it receives a more recent one from the same + /// publisher. + std::optional> last_sample_miss_detection = {}; /// @name Methods @@ -443,12 +463,18 @@ class SessionExt { } if (this->recovery.has_value()) { opts.recovery.is_enabled = true; - if (this->recovery->periodic_queries_period_ms.has_value()) { - // treat 0 as very small delay - opts.recovery.periodic_queries_period_ms = - std::max(1, this->recovery->periodic_queries_period_ms.value()); - } else { - opts.recovery.periodic_queries_period_ms = 0; + if (this->recovery->last_sample_miss_detection.has_value()) { + opts.recovery.last_sample_miss_detection.is_enabled = true; + if (std::holds_alternative( + this->recovery->last_sample_miss_detection.value())) { + opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 0; + } else { + // treat 0 as very small delay + opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = + std::max(1, std::get( + this->recovery->last_sample_miss_detection.value()) + .period_ms); + } } } opts.query_timeout_ms = this->query_timeout_ms; diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx index fc85993e..3a842db4 100644 --- a/include/zenoh/api/session.hxx +++ b/include/zenoh/api/session.hxx @@ -965,9 +965,7 @@ class Session : public Owned<::z_owned_session_t> { __ZENOH_RESULT_CHECK(res, err, "Failed to declare Liveliness Token Subscriber"); return s; } -#endif -#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API) /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future /// release. /// @brief Declares a background subscriber on liveliness tokens that intersect `key_expr`. The subscriber callback @@ -996,13 +994,11 @@ class Session : public Owned<::z_owned_session_t> { auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); ::z_liveliness_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options); - ZResult res = ::zc_liveliness_declare_background_subscriber( + ZResult res = ::z_liveliness_declare_background_subscriber( interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts); __ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Liveliness Token Subscriber"); } -#endif -#if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_LIVELINESS == 1 /// @brief Declare a subscriber on liveliness tokens that intersect `key_expr`. /// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or /// ``zenoh::channels::RingChannel``). @@ -1097,6 +1093,7 @@ class Session : public Owned<::z_owned_session_t> { if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second)); return std::move(cb_handler_pair.second); } +#endif /// @brief Create Timestamp from session id. /// @param err if not null, the result code will be written to this location, otherwise ZException exception will be @@ -1118,7 +1115,6 @@ class Session : public Owned<::z_owned_session_t> { (void)options; __ZENOH_RESULT_CHECK(::z_close(interop::as_loaned_c_ptr(*this), nullptr), err, "Failed to close the session"); } -#endif /// @brief Check if session is closed. /// @return ``true`` if session is closed, ``false`` otherwise. diff --git a/zenoh-c b/zenoh-c index 67b5dceb..bcfe8073 160000 --- a/zenoh-c +++ b/zenoh-c @@ -1 +1 @@ -Subproject commit 67b5dceb010fbf2323d8e0b6595710cadd6e8ddd +Subproject commit bcfe8073a6446d2d23e654641cb075374d987067 diff --git a/zenoh-pico b/zenoh-pico index 39d9a46c..3e4527e9 160000 --- a/zenoh-pico +++ b/zenoh-pico @@ -1 +1 @@ -Subproject commit 39d9a46c2a2a7b48640157a782959eec63a40e6c +Subproject commit 3e4527e945037ca8b232c7ba970e8669709ccc2f