Skip to content

Commit

Permalink
DOSE-662 Use Instant for heartbeat timing (openzfs#464)
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dagnelie <[email protected]>
  • Loading branch information
pcd1193182 authored Sep 27, 2021
1 parent 7c42d49 commit 7e1c06c
Showing 1 changed file with 21 additions and 26 deletions.
47 changes: 21 additions & 26 deletions cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::watch::{self, Receiver};
use uuid::Uuid;
Expand Down Expand Up @@ -144,19 +144,17 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
let mut rx = rx_opt.unwrap();
let tx = tx_opt.unwrap();
tokio::spawn(async move {
let mut last_heartbeat: Option<HeartbeatPhys> = None;
let mut last_heartbeat: Option<Instant> = None;
info!("Starting heartbeat with id {}", id);
let mut interval = tokio::time::interval(*HEARTBEAT_INTERVAL);
loop {
interval.tick().await;
if let Some(ref heartbeat) = last_heartbeat {
let since = SystemTime::now()
.duration_since(heartbeat.timestamp)
.unwrap();
if since > *HEARTBEAT_INTERVAL {
trace!("Heartbeat interval slightly over: {:?}", since);
} else if since > 2 * *HEARTBEAT_INTERVAL {
if let Some(time) = last_heartbeat {
let since = Instant::now().duration_since(time);
if since > 2 * *HEARTBEAT_INTERVAL {
debug!("Heartbeat interval significantly over: {:?}", since);
} else if since > *HEARTBEAT_INTERVAL {
trace!("Heartbeat interval slightly over: {:?}", since);
}
}
{
Expand All @@ -177,14 +175,12 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
return;
}
}
if let Some(ref heartbeat) = last_heartbeat {
let since = SystemTime::now()
.duration_since(heartbeat.timestamp)
.unwrap();
if since > *HEARTBEAT_INTERVAL {
trace!("Heartbeat locking slightly over: {:?}", since);
} else if since > 2 * *HEARTBEAT_INTERVAL {
if let Some(time) = last_heartbeat {
let since = Instant::now().duration_since(time);
if since > 2 * *HEARTBEAT_INTERVAL {
debug!("Heartbeat locking significantly over: {:?}", since);
} else if since > *HEARTBEAT_INTERVAL {
trace!("Heartbeat locking slightly over: {:?}", since);
}
}
let heartbeat = HeartbeatPhys {
Expand All @@ -193,15 +189,16 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
lease_duration: *LEASE_DURATION,
id,
};
let instant = Instant::now();
let result = heartbeat.put_timeout(&object_access, None).await;
if lease_timed_out(&last_heartbeat) {
if lease_timed_out(last_heartbeat) {
panic!("Suspending pools due to lease timeout");
}
if result.is_ok() {
if last_heartbeat.is_none() {
tx.send(true).unwrap();
}
last_heartbeat = Some(heartbeat);
last_heartbeat = Some(instant);
}
}
});
Expand All @@ -210,20 +207,18 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
guard
}

fn lease_timed_out(last_heartbeat: &Option<HeartbeatPhys>) -> bool {
fn lease_timed_out(last_heartbeat: Option<Instant>) -> bool {
match last_heartbeat {
Some(heartbeat) => {
let since = SystemTime::now()
.duration_since(heartbeat.timestamp)
.unwrap_or(heartbeat.lease_duration);
if since > 2 * heartbeat.lease_duration / 3 {
Some(time) => {
let since = Instant::now().duration_since(time);
if since > 2 * *LEASE_DURATION / 3 {
warn!("Extreme heartbeat delay: {:?}", since);
} else if since > heartbeat.lease_duration / 3 {
} else if since > *LEASE_DURATION / 3 {
info!("Long heartbeat delay: {:?}", since);
} else {
debug!("Short heartbeat delay: {:?}", since);
}
since >= heartbeat.lease_duration
since >= *LEASE_DURATION
}
None => false,
}
Expand Down

0 comments on commit 7e1c06c

Please sign in to comment.