Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for default recovery option without last sample miss detection #373

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/zenohc/z_advanced_sub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ int _main(int argc, char **argv) {
ext::SessionExt::AdvancedSubscriberOptions opts;
opts.history.emplace().detect_late_publishers = true;
opts.history->detect_late_publishers = true;
opts.recovery.emplace(); // enable recovery based on received heartbeats from ext::AdvancedPublisher
// enable recovery based on received heartbeats from ext::AdvancedPublisher
opts.recovery.emplace().last_sample_miss_detection =
ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::Heartbeat{};
// alternatively recovery can be triggered based on missed sample detection via periodic queries:
// opts.recovery.emplace().periodic_queries_period_ms = 1000;
// opts.recovery.emplace().last_sample_miss_detection =
// ext::SessionExt::AdvancedSubscriberOptions::RecoveryOptions::PeriodicQueriesOptions{1000};
opts.subscriber_detection = true;

auto data_handler = [](const Sample &sample) {
Expand Down
62 changes: 44 additions & 18 deletions include/zenoh/api/ext/session_ext.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ class SessionExt {
/// @brief Create default option settings.
static CacheOptions create_default() { return {}; }
};
// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.

/// @brief Settings allowing matching Subscribers to detect lost samples and optionally ask for retransimission.
struct SampleMissDetectionOptions {
// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
// detection (if heartbeat-based recovery is enabled). If this value is unset, the subscribers will only be
// notified about missed samples if they opt to send periodic queries.
/// The period of publisher heartbeats in ms, which can be used by ``AdvancedSubscriber`` for missed sample
/// detection (if heartbeat-based recovery is enabled).
/// Otherwise, missed samples will be retransmitted based on Advanced Subscriber queries.
std::optional<uint64_t> heartbeat_period_ms = {};

/// @brief Create default option settings.
static SampleMissDetectionOptions create_default() { return {}; }
};
Expand Down Expand Up @@ -392,14 +394,32 @@ class SessionExt {
struct RecoveryOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
/// If unset, the subscriber will be instead notified about missed samples through ``AdvancedPublisher``
/// heartbeats (if enabled on publisher side).
std::optional<uint64_t> periodic_queries_period_ms = {};
/// @brief Option tag for Heartbeat-based last sample detection.
struct Heartbeat {};

/// @brief Settings for periodic queries-based last sample detection.
struct PeriodicQueriesOptions {
/// @name Fields

/// @brief Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
/// So it is useful for sporadic publications but useless for periodic publications
/// with a period smaller or equal to this period.
uint64_t period_ms = 1000;

/// @name Methods

/// @brief Create default option settings.
static PeriodicQueriesOptions create_default() { return {}; };
};

/// @brief Setting for detecting last sample(s) miss.
/// Note that it does not affect intermediate sample miss detection/retrieval (which is performed
/// automatically as long as recovery is enabled). If this option is disabled, subscriber will be unable to
/// detect/request retransmission of missed sample until it receives a more recent one from the same
/// publisher.
std::optional<std::variant<Heartbeat, PeriodicQueriesOptions>> last_sample_miss_detection = {};

/// @name Methods

Expand Down Expand Up @@ -443,12 +463,18 @@ class SessionExt {
}
if (this->recovery.has_value()) {
opts.recovery.is_enabled = true;
if (this->recovery->periodic_queries_period_ms.has_value()) {
// treat 0 as very small delay
opts.recovery.periodic_queries_period_ms =
std::max<uint64_t>(1, this->recovery->periodic_queries_period_ms.value());
} else {
opts.recovery.periodic_queries_period_ms = 0;
if (this->recovery->last_sample_miss_detection.has_value()) {
opts.recovery.last_sample_miss_detection.is_enabled = true;
if (std::holds_alternative<RecoveryOptions::Heartbeat>(
this->recovery->last_sample_miss_detection.value())) {
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms = 0;
} else {
// treat 0 as very small delay
opts.recovery.last_sample_miss_detection.periodic_queries_period_ms =
std::max<uint64_t>(1, std::get<RecoveryOptions::PeriodicQueriesOptions>(
this->recovery->last_sample_miss_detection.value())
.period_ms);
}
}
}
opts.query_timeout_ms = this->query_timeout_ms;
Expand Down
Loading