Skip to content

Commit

Permalink
support for default recovery option without last sample miss detection (
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 authored and sashacmc committed Jan 17, 2025
1 parent b216284 commit 6756cd8
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 28 deletions.
7 changes: 5 additions & 2 deletions examples/zenohc/z_advanced_sub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ int _main(int argc, char **argv) {
ext::SessionExt::AdvancedSubscriberOptions opts;
opts.history.emplace().detect_late_publishers = true;
opts.history->detect_late_publishers = true;
opts.recovery.emplace(); // enable recovery based on received heartbeats from ext::AdvancedPublisher
// enable recovery based on received heartbeats from ext::AdvancedPublisher
opts.recovery.emplace().last_sample_miss_detection =
ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
// alternatively recovery can be triggered based on missed sample detection via periodic queries:
// opts.recovery.emplace().periodic_queries_period_ms = 1000;
// opts.recovery.emplace().last_sample_miss_detection =
// ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::PeriodicQueriesOptions{1000};
opts.subscriber_detection = true;

auto data_handler = [](const Sample &sample) {
Expand Down
62 changes: 44 additions & 18 deletions include/zenoh/api/ext/session_ext.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ class SessionExt {
/// @brief Create default option settings.
static CacheOptions create_default() { return {}; }
};
// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.

/// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
struct SampleMissDetectionOptions {
// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
// detection (if heartbeat-based recovery is enabled). If this value is unset, the subscribers will only be
// notified about missed samples if they opt to send periodic queries.
/// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
/// detection (if heartbeat-based recovery is enabled).
/// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
std::optional<uint64_t> heartbeat_period_ms = {};

/// @brief Create default option settings.
static SampleMissDetectionOptions create_default() { return {}; }
};
Expand Down Expand Up @@ -392,14 +394,32 @@ class SessionExt {
struct RecoveryOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
/// If unset, the subscriber will be instead notified about missed samples through ``AdvancedPublisher``
/// heartbeats (if enabled on publisher side).
std::optional<uint64_t> periodic_queries_period_ms = {};
/// @brief Option tag for Heartbeat-based last sample detection.
struct Heartbeat {};

/// @brief Settings for periodic queries-based last sample detection.
struct PeriodicQueriesOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
uint64_t period_ms = 1000;

/// @name Methods

/// @brief Create default option settings.
static PeriodicQueriesOptions create_default() { return {}; };
};

/// @brief Setting for detecting last sample(s) miss.
/// Note that it does not affect intermediate sample miss detection/retrieval (which is performed
/// automatically as long as recovery is enabled). If this option is disabled, subscriber will be unable to
/// detect/request retransmission of missed sample until it receives a more recent one from the same
/// publisher.
std::optional<std::variant<Heartbeat, PeriodicQueriesOptions>> last_sample_miss_detection = {};

/// @name Methods

Expand Down Expand Up @@ -443,12 +463,18 @@ class SessionExt {
}
if (this->recovery.has_value()) {
opts.recovery.is_enabled = true;
if (this->recovery->periodic_queries_period_ms.has_value()) {
// treat 0 as very small delay
opts.recovery.periodic_queries_period_ms =
std::max<uint64_t>(1, this->recovery->periodic_queries_period_ms.value());
} else {
opts.recovery.periodic_queries_period_ms = 0;
if (this->recovery->last_sample_miss_detection.has_value()) {
opts.recovery.last_sample_miss_detection.is_enabled = true;
if (std::holds_alternative<RecoveryOptions::Heartbeat>(
this->recovery->last_sample_miss_detection.value())) {
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 0;
} else {
// treat 0 as very small delay
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms =
std::max<uint64_t>(1, std::get<RecoveryOptions::PeriodicQueriesOptions>(
this->recovery->last_sample_miss_detection.value())
.period_ms);
}
}
}
opts.query_timeout_ms = this->query_timeout_ms;
Expand Down
8 changes: 2 additions & 6 deletions include/zenoh/api/session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,7 @@ class Session : public Owned<::z_owned_session_t> {
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Liveliness Token Subscriber");
return s;
}
#endif

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief Declares a background subscriber on liveliness tokens that intersect `key_expr`. The subscriber callback
Expand Down Expand Up @@ -996,13 +994,11 @@ class Session : public Owned<::z_owned_session_t> {
auto closure = ClosureType::into_context(std::forward<C>(on_sample), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure);
::z_liveliness_subscriber_options_t opts = interop::detail::Converter::to_c_opts(options);
ZResult res = ::zc_liveliness_declare_background_subscriber(
ZResult res = ::z_liveliness_declare_background_subscriber(
interop::as_loaned_c_ptr(*this), interop::as_loaned_c_ptr(key_expr), ::z_move(c_closure), &opts);
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Background Liveliness Token Subscriber");
}
#endif

#if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_LIVELINESS == 1
/// @brief Declare a subscriber on liveliness tokens that intersect `key_expr`.
/// @tparam Channel the type of channel used to create stream of data (see ``zenoh::channels::FifoChannel`` or
/// ``zenoh::channels::RingChannel``).
Expand Down Expand Up @@ -1097,6 +1093,7 @@ class Session : public Owned<::z_owned_session_t> {
if (res != Z_OK) ::z_drop(interop::as_moved_c_ptr(cb_handler_pair.second));
return std::move(cb_handler_pair.second);
}
#endif

/// @brief Create Timestamp from session id.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
Expand All @@ -1118,7 +1115,6 @@ class Session : public Owned<::z_owned_session_t> {
(void)options;
__ZENOH_RESULT_CHECK(::z_close(interop::as_loaned_c_ptr(*this), nullptr), err, "Failed to close the session");
}
#endif

/// @brief Check if session is closed.
/// @return ``true`` if session is closed, ``false`` otherwise.
Expand Down

0 comments on commit 6756cd8

Please sign in to comment.