Skip to content

Commit

Permalink
[time] Implement PullSource support for TimeSourceManager
Browse files Browse the repository at this point in the history
Bug: 110804
Change-Id: I9429a5d0caccedf521a6cadfc6329bc98dcc57fa
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/762170
Commit-Queue: Viktar Donich <[email protected]>
Reviewed-by: Jody Sankey <[email protected]>
  • Loading branch information
vdonich authored and CQ Bot committed Dec 7, 2022
1 parent 1bc14dc commit 2f14460
Showing 1 changed file with 221 additions and 20 deletions.
241 changes: 221 additions & 20 deletions src/sys/time/timekeeper/src/time_source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use {
crate::diagnostics::{Diagnostics, Event},
crate::enums::{Role, SampleValidationError, TimeSourceError},
crate::time_source::{
BoxedPushSource, BoxedPushSourceEventStream, Event as TimeSourceEvent, Sample, TimeSource,
BoxedPullSource, BoxedPushSource, BoxedPushSourceEventStream, Event as TimeSourceEvent,
Sample, TimeSource,
},
fidl_fuchsia_time_external::Status,
fidl_fuchsia_time_external::{Status, Urgency},
fuchsia_async::{self as fasync, TimeoutExt},
fuchsia_zircon as zx,
futures::{FutureExt as _, StreamExt as _},
Expand Down Expand Up @@ -53,11 +54,9 @@ pub struct TimeSourceManager<D: Diagnostics, M: MonotonicProvider> {
manager: TimeManager<D, M>,
}

/// TODO(fxb/110804): Add PullSourceManager.
#[allow(dead_code)]
enum TimeManager<D: Diagnostics, M: MonotonicProvider> {
Push(PushSourceManager<D, M>),
Pull(),
Pull(PullSourceManager<D, M>),
}

/// A wrapper that launches a time source component, uses PushSource to obtain time samples,
Expand Down Expand Up @@ -237,7 +236,16 @@ impl<D: Diagnostics> TimeSourceManager<D, KernelMonotonicProvider> {
last_status: None,
last_accepted_sample_arrival: None,
}),
TimeSource::Pull(_) => unimplemented!(),
TimeSource::Pull(time_source) => TimeManager::Pull(PullSourceManager {
backstop,
delays_enabled: true,
role,
time_source,
diagnostics,
monotonic: KernelMonotonicProvider(),
last_sample_request_time: None,
received_sample: false,
}),
};
TimeSourceManager { manager }
}
Expand All @@ -254,7 +262,7 @@ impl<D: Diagnostics> TimeSourceManager<D, KernelMonotonicProvider> {
let mut manager = Self::new(backstop, role, time_source, diagnostics);
match &mut manager.manager {
TimeManager::Push(m) => m.delays_enabled = false,
TimeManager::Pull() => unimplemented!(),
TimeManager::Pull(m) => m.delays_enabled = false,
}
manager
}
Expand All @@ -269,26 +277,104 @@ impl<D: Diagnostics, M: MonotonicProvider> TimeSourceManager<D, M> {
pub fn role(&self) -> Role {
match &self.manager {
TimeManager::Push(m) => m.role,
TimeManager::Pull() => unimplemented!(),
TimeManager::Pull(m) => m.role,
}
}

/// Returns the next valid sample from the time source.
pub async fn next_sample(&mut self) -> Sample {
match &mut self.manager {
TimeManager::Push(m) => m.next_sample_from_push().await,
TimeManager::Pull() => unimplemented!(),
TimeManager::Pull(m) => m.next_sample_from_pull().await,
}
}
}

/// A wrapper that launches a time source component, uses PushSource to obtain time samples,
/// validates them from the source, and handles relaunching the source in the case of failures.
struct PullSourceManager<D: Diagnostics, M: MonotonicProvider> {
/// The role of the time source being managed.
role: Role,
/// The backstop time that samples must not come before.
backstop: zx::Time,
/// Whether the time source restart delay and minimum update delay should be enabled.
delays_enabled: bool,
/// A source of monotonic time.
monotonic: M,
/// The time source to be managed.
time_source: BoxedPullSource,

/// A diagnostics implementation for recording events of note.
diagnostics: Arc<D>,

/// The monotonic time at which the last sample was requested.
last_sample_request_time: Option<zx::Time>,

/// If the manager ever received a sample.
received_sample: bool,
}

impl<D: Diagnostics, M: MonotonicProvider> PullSourceManager<D, M> {
/// Returns the next valid sample from the Pull timesource.
async fn next_sample_from_pull(&mut self) -> Sample {
loop {
let sample = loop {
self.last_sample_request_time = Some(self.monotonic.now());
// TODO(fxb/116230): Adjust urgency.
match self.time_source.sample(&Urgency::Medium).await {
Ok(sample) => break sample,
Err(err) => {
error!("Error obtaining time sample on {:?}: {:?}", self.role, err);
self.record_time_source_failure(TimeSourceError::LaunchFailed);
if self.delays_enabled {
fasync::Timer::new(fasync::Time::after(RESTART_DELAY)).await;
}
continue;
}
};
};
match self.validate_sample(&sample) {
Ok(_) => {
info!("{:?} received valid sample", self.role);
self.received_sample = true;
return sample;
}
Err(error) => {
error!("Rejected invalid sample from {:?}: {:?}", self.role, error);
self.diagnostics.record(Event::SampleRejected { role: self.role, error });
}
}
}
}

/// Validates the supplied time sample against the current state.
fn validate_sample(&mut self, sample: &Sample) -> Result<(), SampleValidationError> {
let current_monotonic = self.monotonic.now();
if sample.utc < self.backstop {
Err(SampleValidationError::BeforeBackstop)
} else if sample.monotonic > current_monotonic {
Err(SampleValidationError::MonotonicInFuture)
} else if self.last_sample_request_time.map_or(true, |time| sample.monotonic < time) {
// If the sample wasn't requested or represents time before the sample request.
Err(SampleValidationError::MonotonicTooOld)
} else {
Ok(())
}
}

/// Record a time source failure via diagnostics.
fn record_time_source_failure(&self, error: TimeSourceError) {
self.diagnostics.record(Event::TimeSourceFailed { role: self.role, error });
}
}

#[cfg(test)]
mod test {
use {
super::*,
crate::diagnostics::FakeDiagnostics,
crate::enums::{SampleValidationError as SVE, TimeSourceError as TSE},
crate::time_source::FakePushTimeSource,
crate::time_source::{FakePullTimeSource, FakePushTimeSource},
anyhow::anyhow,
};

Expand All @@ -306,6 +392,16 @@ mod test {
}};
}

macro_rules! assert_pull_manager {
($manager:expr) => {{
if let TimeManager::Pull(m) = $manager.manager {
m
} else {
panic!("Expected TimeManager::Pull.")
}
}};
}

/// A provider of artificial monotonic times that increment by a fixed duration each call.
struct FakeMonotonicProvider {
increment: zx::Duration,
Expand All @@ -326,10 +422,10 @@ mod test {
}
}

/// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
/// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
/// diagnostics.
fn create_manager(
/// Create a new `TimeSourceManager` from PushSource using the standard backstop time and role,
/// a monotonic time that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time
/// source and diagnostics.
fn create_manager_from_push(
time_source: FakePushTimeSource,
diagnostics: Arc<FakeDiagnostics>,
) -> TimeSourceManager<FakeDiagnostics, FakeMonotonicProvider> {
Expand All @@ -347,6 +443,26 @@ mod test {
TimeSourceManager { manager }
}

/// Create a new `TimeSourceManager` from PullSource using the standard backstop time and role,
/// a monotonic time that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time
/// source and diagnostics.
fn create_manager_from_pull(
time_source: FakePullTimeSource,
diagnostics: Arc<FakeDiagnostics>,
) -> TimeSourceManager<FakeDiagnostics, FakeMonotonicProvider> {
let manager = TimeManager::Pull(PullSourceManager {
backstop: zx::Time::ZERO + (MIN_UPDATE_DELAY * BACKSTOP_FACTOR),
delays_enabled: true,
monotonic: FakeMonotonicProvider::new(MIN_UPDATE_DELAY),
role: TEST_ROLE,
time_source: Box::new(time_source),
diagnostics,
last_sample_request_time: None,
received_sample: false,
});
TimeSourceManager { manager }
}

/// Create a new `TimeSourceManager` using the standard backstop time and role, a monotonic time
/// that increments by `MIN_UPDATE_DELAY` per sample, and the supplied time source and
/// diagnostics. Restart and min update delays are disabled.
Expand Down Expand Up @@ -380,10 +496,18 @@ mod test {
}

#[fuchsia::test]
fn role_accessor() {
fn push_role_accessor() {
let time_source = FakePushTimeSource::failing();
let diagnostics = Arc::new(FakeDiagnostics::new());
let manager = create_manager(time_source, diagnostics);
let manager = create_manager_from_push(time_source, diagnostics);
assert_eq!(manager.role(), TEST_ROLE);
}

#[fuchsia::test]
fn pull_role_accessor() {
let time_source = FakePullTimeSource::failing();
let diagnostics = Arc::new(FakeDiagnostics::new());
let manager = create_manager_from_pull(time_source, diagnostics);
assert_eq!(manager.role(), TEST_ROLE);
}

Expand All @@ -397,7 +521,7 @@ mod test {
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 3, 3)),
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager(time_source, Arc::clone(&diagnostics));
let mut manager = create_manager_from_push(time_source, Arc::clone(&diagnostics));

assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 3));
Expand All @@ -415,6 +539,30 @@ mod test {
]);
}

#[fuchsia::test(allow_stalls = false)]
async fn event_in_future_pull_source() {
let time_source = FakePullTimeSource::samples(vec![
(Urgency::Medium, create_sample(BACKSTOP_FACTOR + 1, 1)),
// Should be ignored since monotonic is in the future
(Urgency::Medium, create_sample(BACKSTOP_FACTOR + 2, 20)),
(Urgency::Medium, create_sample(BACKSTOP_FACTOR + 3, 5)),
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager_from_pull(time_source, Arc::clone(&diagnostics));

assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 3, 5));

let pull_manager = assert_pull_manager!(manager);

assert_eq!(pull_manager.received_sample, true);

diagnostics.assert_events(&[Event::SampleRejected {
role: TEST_ROLE,
error: SVE::MonotonicInFuture,
}]);
}

#[fuchsia::test(allow_stalls = false)]
async fn sample_implies_ok() {
let time_source = FakePushTimeSource::events(vec![
Expand All @@ -427,7 +575,7 @@ mod test {
TimeSourceEvent::from(create_sample(BACKSTOP_FACTOR + 2, 2)),
]);
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = create_manager(time_source, Arc::clone(&diagnostics));
let mut manager = create_manager_from_push(time_source, Arc::clone(&diagnostics));

assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 1, 1));
assert_eq!(manager.next_sample().await, create_sample(BACKSTOP_FACTOR + 2, 2));
Expand Down Expand Up @@ -526,10 +674,41 @@ mod test {
}]);
}

#[fuchsia::test]
async fn restart_on_launch_failure_pull_source() {
let time_source = FakePullTimeSource::failing().into();
let diagnostics = Arc::new(FakeDiagnostics::new());
let mut manager = TimeSourceManager::new(
zx::Time::ZERO,
TEST_ROLE,
time_source,
Arc::clone(&diagnostics),
);

// Calling next sample on this manager with the restart delay enabled should lead to
// failed launch and then a few minute cooldown period before relaunch. We test for this by
// verifying a short timeout triggered.
assert_eq!(
manager
.next_sample()
.map(|_| true)
.on_timeout(zx::Time::after(zx::Duration::from_millis(50)), || false)
.await,
false
);

diagnostics.assert_events(&[Event::TimeSourceFailed {
role: TEST_ROLE,
error: TSE::LaunchFailed,
}]);
}

#[fuchsia::test]
fn validate_sample_failures() {
let manager =
create_manager(FakePushTimeSource::failing(), Arc::new(FakeDiagnostics::new()));
let manager = create_manager_from_push(
FakePushTimeSource::failing(),
Arc::new(FakeDiagnostics::new()),
);
let mut push_manager = assert_push_manager!(manager);
push_manager.last_status = Some(Status::Ok);

Expand Down Expand Up @@ -568,4 +747,26 @@ mod test {
Ok(zx::Time::ZERO + MIN_UPDATE_DELAY * 6)
);
}

#[fuchsia::test]
fn validate_sample_failures_pull_source() {
let manager = create_manager_from_pull(
FakePullTimeSource::failing(),
Arc::new(FakeDiagnostics::new()),
);
let mut pull_manager = assert_pull_manager!(manager);
pull_manager.last_sample_request_time = Some(zx::Time::ZERO);

// The monotonic our manager sees will start at a factor of 1 and increment by 1 each time
// we try to validate a sample.
assert_eq!(pull_manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 1)), Ok(()));
assert_eq!(
pull_manager.validate_sample(&create_sample(BACKSTOP_FACTOR - 1, 2)),
Err(SVE::BeforeBackstop)
);
assert_eq!(
pull_manager.validate_sample(&create_sample(BACKSTOP_FACTOR, 100)),
Err(SVE::MonotonicInFuture)
);
}
}

0 comments on commit 2f14460

Please sign in to comment.