Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix tari pulse running as fast as possible #6704

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions base_layer/core/src/base_node/tari_pulse_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{str::FromStr, time::Duration};
use std::{cmp::min, str::FromStr, time::Duration};

use futures::future;
use hickory_client::{
Expand All @@ -33,13 +33,13 @@ use hickory_client::{
rr::{DNSClass, Name, RData, Record, RecordType},
tcp::TcpClientStream,
};
use log::{error, info, warn};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use tari_p2p::Network;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time};
use tokio::{net::TcpStream as TokioTcpStream, sync::watch, time, time::MissedTickBehavior};

use super::LocalNodeCommsInterface;
use crate::base_node::comms_interface::CommsInterfaceError;
Expand Down Expand Up @@ -121,31 +121,37 @@ impl TariPulseService {
notify_passed_checkpoints: watch::Sender<bool>,
) {
let mut interval = time::interval(self.config.check_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::pin!(interval);
let mut shutdown_signal = self.shutdown_signal.clone();
let mut count = 0u64;
let mut skip_ticks = 0;
let mut skipped_ticks = 0;

loop {
tokio::select! {
_ = interval.tick() => {
let passed_checkpoints = match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
interval = time::interval(self.config.check_interval); // reset interval if back to healthy
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
let old_interval = interval.period().as_secs();
let new_interval = if old_interval > (60 * 30) {
warn!(target: LOG_TARGET, "Reached maximum retry interval of 30 minutes.");
old_interval
} else {
// increase interval if node repeatedly (up to 30 min) fails to fetch checkpoints
interval = time::interval(Duration::from_secs(old_interval * 2));
interval.tick().await;
interval.period().as_secs()
};
warn!(target: LOG_TARGET, "Retrying in {} seconds", new_interval);
continue;
},
count += 1;
trace!(target: LOG_TARGET, "Interval tick: {}", count);
if skipped_ticks < skip_ticks {
skipped_ticks += 1;
debug!(target: LOG_TARGET, "Skipping {} of {} ticks", skipped_ticks, skip_ticks);
continue;
}
let passed_checkpoints = {
match self.passed_checkpoints(&mut base_node_service).await {
Ok(passed) => {
skip_ticks = 0;
skipped_ticks = 0;
passed
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to check if node has passed checkpoints: {:?}", err);
skip_ticks = min(skip_ticks + 1, 30 * 60 / self.config.check_interval.as_secs());
skipped_ticks = 0;
continue;
},
}
};

notify_passed_checkpoints
Expand Down Expand Up @@ -174,7 +180,12 @@ impl TariPulseService {
.max_by(|a, b| a.0.cmp(&b.0))
.ok_or(CommsInterfaceError::InternalError("No checkpoints found".to_string()))?;
let local_checkpoints = self.get_node_block(base_node_service, max_height_block.0).await?;
Ok(local_checkpoints.1 == max_height_block.1)
let passed = local_checkpoints.1 == max_height_block.1;
trace!(
target: LOG_TARGET, "Passed checkpoints: {}, DNS: ({}, {}), Local: ({}, {})",
passed, max_height_block.0, max_height_block.1, local_checkpoints.0, local_checkpoints.1
);
Ok(passed)
}

async fn get_node_block(
Expand Down
Loading