Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Better metrics & board
Browse files Browse the repository at this point in the history
  • Loading branch information
adria0 committed Jun 9, 2020
1 parent 700b03a commit d3bce9a
Show file tree
Hide file tree
Showing 20 changed files with 689 additions and 291 deletions.
2 changes: 1 addition & 1 deletion ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ where
match timer {
CLIENT_TICK_TIMER => {
use snapshot::SnapshotService;
let snapshot_restoration = if let RestorationStatus::Ongoing{..} = self.snapshot.status() { true } else { false };
let snapshot_restoration = if let RestorationStatus::Ongoing{..} = self.snapshot.restoration_status() { true } else { false };
self.client.tick(snapshot_restoration)
},
SNAPSHOT_TICK_TIMER => self.snapshot.tick(),
Expand Down
20 changes: 17 additions & 3 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_types::{
io_message::ClientIoMessage,
errors::{EthcoreError as Error, SnapshotError, SnapshotError::UnlinkedAncientBlockChain},
ids::BlockId,
snapshot::{ManifestData, Progress, RestorationStatus},
snapshot::{ManifestData, Progress, CreationStatus, RestorationStatus},
};
use client_traits::ChainInfo;
use engine::Engine;
Expand Down Expand Up @@ -267,6 +267,7 @@ pub struct Service<C: Send + Sync + 'static> {
client: Arc<C>,
progress: RwLock<Progress>,
taking_snapshot: AtomicBool,
taking_snapshot_num : AtomicUsize,
restoring_snapshot: AtomicBool,
}

Expand All @@ -288,6 +289,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
client: params.client,
progress: RwLock::new(Progress::new()),
taking_snapshot: AtomicBool::new(false),
taking_snapshot_num: AtomicUsize::new(0),
restoring_snapshot: AtomicBool::new(false),
};

Expand Down Expand Up @@ -509,6 +511,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {

info!("Taking snapshot at #{}", num);
{
self.taking_snapshot_num.store(num as usize, Ordering::SeqCst);
scopeguard::defer! {{
self.taking_snapshot.store(false, Ordering::SeqCst);
}}
Expand Down Expand Up @@ -613,13 +616,15 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {

self.restoring_snapshot.store(true, Ordering::SeqCst);

let block_number = manifest.block_number;
// Import previous chunks, continue if it fails
self.import_prev_chunks(&mut res, manifest).ok();

// It could be that the restoration failed or completed in the meanwhile
let mut restoration_status = self.status.lock();
if let RestorationStatus::Initializing { .. } = *restoration_status {
*restoration_status = RestorationStatus::Ongoing {
block_number,
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
Expand Down Expand Up @@ -759,7 +764,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
/// Feed a chunk with the Restoration
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
let (result, db) = {
match self.status() {
match self.restoration_status() {
RestorationStatus::Inactive | RestorationStatus::Failed | RestorationStatus::Finalizing => {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive, failed or finalizing", hash);
return Ok(());
Expand Down Expand Up @@ -852,7 +857,16 @@ impl<C: Send + Sync> SnapshotService for Service<C> {
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok())
}

fn status(&self) -> RestorationStatus {
fn creation_status(&self) -> CreationStatus {
match self.taking_snapshot.load(Ordering::SeqCst) {
false => CreationStatus::Inactive,
true => CreationStatus::Ongoing {
block_number: self.taking_snapshot_num.load(Ordering::SeqCst) as u32
},
}
}

fn restoration_status(&self) -> RestorationStatus {
let mut cur_status = self.status.lock();

match *cur_status {
Expand Down
7 changes: 5 additions & 2 deletions ethcore/snapshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use client_traits::{BlockChainClient, BlockInfo, DatabaseRestore, BlockChainRese
use common_types::{
ids::BlockId,
errors::{EthcoreError as Error, SnapshotError},
snapshot::{ManifestData, ChunkSink, Progress, RestorationStatus},
snapshot::{ManifestData, ChunkSink, Progress, CreationStatus, RestorationStatus},
};
use engine::Engine;
use ethereum_types::H256;
Expand All @@ -49,7 +49,10 @@ pub trait SnapshotService : Sync + Send {
fn chunk(&self, hash: H256) -> Option<Bytes>;

/// Ask the snapshot service for the restoration status.
fn status(&self) -> RestorationStatus;
fn restoration_status(&self) -> RestorationStatus;

/// Ask the snapshot service for the creation status.
fn creation_status(&self) -> CreationStatus;

/// Begin snapshot restoration.
/// If a restoration is in progress, this will reset it and clear all data.
Expand Down
3 changes: 0 additions & 3 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2900,11 +2900,8 @@ impl PrometheusMetrics for Client {
prometheus_gauge(r,"chain_warpsync_gap_last","Warp sync gap, last block",last.as_u64() as i64);
}

prometheus_gauge(r,"chain_difficulty_total","Blockchain difficulty",chain.total_difficulty.as_u64() as i64);
prometheus_gauge(r,"chain_difficulty_total_pending","Block queue difficulty",chain.pending_total_difficulty.as_u64() as i64);
prometheus_gauge(r,"chain_block","Best block number",chain.best_block_number as i64);


// prunning info
let prunning = self.pruning_info();
prometheus_gauge(r,"prunning_earliest_chain","The first block which everything can be served after",prunning.earliest_chain as i64);
Expand Down
37 changes: 32 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use common_types::{
chain_notify::{NewBlocks, ChainMessageType},
pruning_info::PruningInfo,
transaction::UnverifiedTransaction,
snapshot::{CreationStatus, RestorationStatus}
};
use stats::{PrometheusMetrics, prometheus_counter, prometheus_gauge, prometheus};

Expand Down Expand Up @@ -417,16 +418,42 @@ impl PrometheusMetrics for EthSync {
let scalar = |b| if b {1i64} else {0i64};
let sync_status = self.status();

prometheus_gauge(r,"sync_peers","Total number of connected peers",sync_status.num_peers as i64);
prometheus_gauge(r,"sync_active_peers","Total number of active peers",sync_status.num_active_peers as i64);
prometheus_gauge(r,
"sync_status",
"WaitingPeers(0), SnapshotManifest(1), SnapshotData(2), SnapshotWaiting(3), Blocks(4), Idle(5), Waiting(6), NewBlocks(7)",
match self.eth_handler.sync.status().state {
SyncState::WaitingPeers => 0,
SyncState::SnapshotManifest => 1,
SyncState::SnapshotData => 2,
SyncState::SnapshotWaiting => 3,
SyncState::Blocks => 4,
SyncState::Idle => 5,
SyncState::Waiting => 6,
SyncState::NewBlocks => 7,
});

prometheus_gauge(r,"net_peers","Total number of connected peers",sync_status.num_peers as i64);
prometheus_gauge(r,"net_active_peers","Total number of active peers",sync_status.num_active_peers as i64);
prometheus_counter(r,"sync_blocks_recieved","Number of blocks downloaded so far",sync_status.blocks_received as i64);
prometheus_counter(r,"sync_blocks_total","Total number of blocks for the sync process",sync_status.blocks_total as i64);
prometheus_gauge(r,"sync_blocks_highest","Highest block number in the download queue",sync_status.highest_block_number.unwrap_or(0) as i64);
prometheus_gauge(r,"sync_is_majorsync","Are we in the middle of a major sync?",scalar(self.is_major_syncing()));
prometheus_gauge(r,"sync_mem_used","Heap memory used in bytes",sync_status.mem_used as i64);
prometheus_gauge(r,"sync_snapshot_is_sync",".",scalar(sync_status.is_snapshot_syncing()));
prometheus_gauge(r,"sync_snapshot_chunks","Snapshot chunks",sync_status.num_snapshot_chunks as i64);
prometheus_gauge(r,"sync_snapshot_chunks_done","Snapshot chunks downloaded",sync_status.snapshot_chunks_done as i64);
prometheus_gauge(r,"snapshot_download_active","1 if downloading snapshots",scalar(sync_status.is_snapshot_syncing()));
prometheus_gauge(r,"snapshot_download_chunks","Snapshot chunks",sync_status.num_snapshot_chunks as i64);
prometheus_gauge(r,"snapshot_download_chunks_done","Snapshot chunks downloaded",sync_status.snapshot_chunks_done as i64);

let restoration = self.eth_handler.snapshot_service.restoration_status();
let creation = self.eth_handler.snapshot_service.creation_status();

prometheus_gauge(r,"snapshot_create_block", "First block of the current snapshot creation", match creation {
CreationStatus::Ongoing{block_number} => block_number as i64,
_ => 0,
});
prometheus_gauge(r,"snapshot_restore_block", "First block of the current snapshot restoration", match restoration {
RestorationStatus::Ongoing{block_number, ..} => block_number as i64,
_ => 0,
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ impl SyncHandler {
}

// check service status
let status = io.snapshot_service().status();
let status = io.snapshot_service().restoration_status();
match status {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "snapshot_sync", "{}: Snapshot restoration status: {:?}", peer_id, status);
Expand Down
8 changes: 4 additions & 4 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,8 @@ impl ChainSync {
}
},
SyncState::SnapshotData => {
match io.snapshot_service().status() {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, state_chunks, block_chunks } => {
match io.snapshot_service().restoration_status() {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, state_chunks, block_chunks, .. } => {
// Initialize the snapshot if not already done
self.snapshot.initialize(io.snapshot_service(), block_chunks as usize + state_chunks as usize);
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
Expand Down Expand Up @@ -1322,14 +1322,14 @@ impl ChainSync {
self.set_state(SyncState::Blocks);
self.continue_sync(io);
},
SyncState::SnapshotData => match io.snapshot_service().status() {
SyncState::SnapshotData => match io.snapshot_service().restoration_status() {
RestorationStatus::Inactive | RestorationStatus::Failed => {
self.set_state(SyncState::SnapshotWaiting);
},
RestorationStatus::Initializing { .. } | RestorationStatus::Ongoing { .. } | RestorationStatus::Finalizing => (),
},
SyncState::SnapshotWaiting => {
match io.snapshot_service().status() {
match io.snapshot_service().restoration_status() {
RestorationStatus::Inactive => {
trace!(target:"snapshot_sync", "Snapshot restoration is complete");
self.restart(io);
Expand Down
14 changes: 14 additions & 0 deletions ethcore/types/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ pub enum RestorationStatus {
},
/// Ongoing restoration.
Ongoing {
/// Block number specified in the manifest.
block_number: u64,
/// Total number of state chunks.
state_chunks: u32,
/// Total number of block chunks.
Expand All @@ -197,3 +199,15 @@ pub enum RestorationStatus {
/// Failed restoration.
Failed,
}

/// Statuses for snapshot creation.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum CreationStatus {
/// No creation activity currently.
Inactive,
/// Snapshot creation is in progress.
Ongoing {
/// Current created snapshot.
block_number: u32,
}
}
6 changes: 3 additions & 3 deletions parity/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl<T: InformantData> Informant<T> {

let rpc_stats = self.rpc_stats.as_ref();
let snapshot_sync = sync_info.as_ref().map_or(false, |s| s.snapshot_sync) && self.snapshot.as_ref().map_or(false, |s|
match s.status() {
match s.restoration_status() {
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => true,
_ => false,
}
Expand Down Expand Up @@ -315,8 +315,8 @@ impl<T: InformantData> Informant<T> {
),
true => {
self.snapshot.as_ref().map_or(String::new(), |s|
match s.status() {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } => {
match s.restoration_status() {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done, .. } => {
format!("Syncing snapshot {}/{}", state_chunks_done + block_chunks_done, state_chunks + block_chunks)
},
RestorationStatus::Initializing { chunks_done, state_chunks, block_chunks } => {
Expand Down
8 changes: 4 additions & 4 deletions parity/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService<Client>>, read

let informant_handle = snapshot.clone();
::std::thread::spawn(move || {
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = informant_handle.status() {
while let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = informant_handle.restoration_status() {
info!("Processed {}/{} state chunks and {}/{} block chunks.",
state_chunks_done, num_state, block_chunks_done, num_blocks);
::std::thread::sleep(Duration::from_secs(5));
Expand All @@ -95,7 +95,7 @@ fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService<Client>>, read

info!("Restoring state");
for &state_hash in &manifest.state_hashes {
if snapshot.status() == RestorationStatus::Failed {
if snapshot.restoration_status() == RestorationStatus::Failed {
return Err("Restoration failed".into());
}

Expand All @@ -112,7 +112,7 @@ fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService<Client>>, read

info!("Restoring blocks");
for &block_hash in &manifest.block_hashes {
if snapshot.status() == RestorationStatus::Failed {
if snapshot.restoration_status() == RestorationStatus::Failed {
return Err("Restoration failed".into());
}

Expand All @@ -126,7 +126,7 @@ fn restore_using<R: SnapshotReader>(snapshot: Arc<SnapshotService<Client>>, read
snapshot.feed_block_chunk(block_hash, &chunk);
}

match snapshot.status() {
match snapshot.restoration_status() {
RestorationStatus::Ongoing { .. } => Err("Snapshot file is incomplete and missing chunks.".into()),
RestorationStatus::Initializing { .. } => Err("Snapshot restoration is still initializing.".into()),
RestorationStatus::Finalizing => Err("Snapshot restoration is still finalizing.".into()),
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
fn syncing(&self) -> Result<SyncStatus> {
let status = self.sync.status();
let client = &self.client;
let snapshot_status = self.snapshot.status();
let snapshot_status = self.snapshot.restoration_status();

let (warping, warp_chunks_amount, warp_chunks_processed) = match snapshot_status {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done, .. } =>
(true, Some(block_chunks + state_chunks), Some(block_chunks_done + state_chunks_done)),
_ => (false, None, None),
};
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/impls/parity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl<C, M, U, S> Parity for ParityClient<C, M, U> where

fn status(&self) -> Result<()> {
let has_peers = self.settings.is_dev_chain || self.sync.status().num_peers > 0;
let is_warping = match self.snapshot.as_ref().map(|s| s.status()) {
let is_warping = match self.snapshot.as_ref().map(|s| s.restoration_status()) {
Some(RestorationStatus::Ongoing { .. }) => true,
_ => false,
};
Expand Down
Loading

0 comments on commit d3bce9a

Please sign in to comment.