Skip to content

Commit

Permalink
add disk spack check (#3598)
Browse files Browse the repository at this point in the history
Co-authored-by: jiangying <[email protected]>
  • Loading branch information
YouNeedWork and jiangying000 authored Aug 8, 2022
1 parent 89aa661 commit f3c9687
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 17 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 30 additions & 10 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use starcoin_rpc_server::module::{PubSubService, PubSubServiceFactory};
use starcoin_rpc_server::service::RpcService;
use starcoin_service_registry::bus::{Bus, BusService};
use starcoin_service_registry::{
ActorService, RegistryAsyncService, RegistryService, ServiceContext, ServiceFactory,
ServiceHandler, ServiceRef,
ActorService, EventHandler, RegistryAsyncService, RegistryService, ServiceContext,
ServiceFactory, ServiceHandler, ServiceRef,
};
use starcoin_state_service::ChainStateService;
use starcoin_storage::block_info::BlockInfoStore;
Expand All @@ -53,7 +53,7 @@ use starcoin_sync::sync::SyncService;
use starcoin_sync::txn_sync::TxnSyncService;
use starcoin_sync::verified_rpc_client::VerifiedRpcClient;
use starcoin_txpool::TxPoolActorService;
use starcoin_types::system_events::SystemStarted;
use starcoin_types::system_events::{SystemShutdown, SystemStarted};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand All @@ -69,7 +69,23 @@ impl ServiceFactory<Self> for NodeService {
}
}

impl ActorService for NodeService {}
impl ActorService for NodeService {
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.subscribe::<SystemShutdown>();
Ok(())
}

fn stopped(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
ctx.unsubscribe::<SystemShutdown>();
Ok(())
}
}

impl EventHandler<Self, SystemShutdown> for NodeService {
fn handle_event(&mut self, _: SystemShutdown, _: &mut ServiceContext<Self>) {
self.shutdown_system();
}
}

impl ServiceHandler<Self, NodeRequest> for NodeService {
fn handle(
Expand Down Expand Up @@ -105,12 +121,7 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
}
NodeRequest::ShutdownSystem => {
info!("Receive StopSystem request, try to stop system.");
if let Err(e) = self.registry.shutdown_system_sync() {
error!("Shutdown registry error: {}", e);
};
//wait a seconds for registry shutdown, then stop System.
std::thread::sleep(Duration::from_millis(2000));
System::current().stop();
self.shutdown_system();
NodeResponse::Result(Ok(()))
}
NodeRequest::StopPacemaker => NodeResponse::Result(
Expand Down Expand Up @@ -404,4 +415,13 @@ impl NodeService {

Ok((registry, node_service))
}

fn shutdown_system(&self) {
if let Err(e) = self.registry.shutdown_system_sync() {
error!("Shutdown registry error: {}", e);
};
//wait a seconds for registry shutdown, then stop System.
std::thread::sleep(Duration::from_millis(2000));
System::current().stop();
}
}
4 changes: 3 additions & 1 deletion sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ futures-retry = "0.6"
itertools = { version = "0.10.3", default-features = false }
pin-project = "1.0.1"
futures-timer = "3.0"
stream-task = { path = "../commons/stream-task" }
sysinfo = "0.25.1"
stream-task ={ path = "../commons/stream-task"}
starcoin-chain = { path = "../chain" }
config = { path = "../config", package = "starcoin-config" }
network = { path = "../network", package = "starcoin-network" }
Expand Down Expand Up @@ -47,6 +48,7 @@ starcoin-chain-service = { path = "../chain/service" }
starcoin-chain-api = { path = "../chain/api" }
network-rpc-core = { path = "../network-rpc/core" }


[dev-dependencies]
tokio = { version = "^1", features = ["full"] }
miner = { path = "../miner", package = "starcoin-miner" }
Expand Down
100 changes: 94 additions & 6 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::BlockConnectedEvent;
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use anyhow::{format_err, Result};
use config::{NodeConfig, G_CRATE_VERSION};
use executor::VMMetrics;
Expand All @@ -18,20 +18,29 @@ use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::PeerNewBlock;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent};
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
use std::sync::Arc;
use sysinfo::{DiskExt, System, SystemExt};
use txpool::TxPoolService;

const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

pub struct BlockConnectorService {
chain_service: WriteBlockChainService<TxPoolService>,
sync_status: Option<SyncStatus>,
config: Arc<NodeConfig>,
}

impl BlockConnectorService {
pub fn new(chain_service: WriteBlockChainService<TxPoolService>) -> Self {
pub fn new(
chain_service: WriteBlockChainService<TxPoolService>,
config: Arc<NodeConfig>,
) -> Self {
Self {
chain_service,
sync_status: None,
config,
}
}

Expand All @@ -41,6 +50,53 @@ impl BlockConnectorService {
None => false,
}
}

pub fn check_disk_space(&mut self) -> Option<Result<u64>> {
if System::IS_SUPPORTED {
let mut sys = System::new_all();
if sys.disks().len() == 1 {
let disk = &sys.disks()[0];
dbg!(DISK_CHECKPOINT_FOR_PANIC);
dbg!(disk.available_space());
if DISK_CHECKPOINT_FOR_PANIC > disk.available_space() {
return Some(Err(anyhow::anyhow!(
"Disk space is less than {} GB, please add disk space.",
DISK_CHECKPOINT_FOR_PANIC / 1024 / 1024 / 1024
)));
} else if DISK_CHECKPOINT_FOR_WARN > disk.available_space() {
return Some(Ok(disk.available_space() / 1024 / 1024 / 1024));
}
} else {
sys.sort_disks_by(|a, b| {
if a.mount_point()
.starts_with(b.mount_point().to_str().unwrap())
{
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});

let base_data_dir = self.config.base().base_data_dir.path();
for disk in sys.disks() {
if base_data_dir.starts_with(disk.mount_point()) {
if DISK_CHECKPOINT_FOR_PANIC > disk.available_space() {
return Some(Err(anyhow::anyhow!(
"Disk space is less than {} GB, please add disk space.",
DISK_CHECKPOINT_FOR_PANIC / 1024 / 1024 / 1024
)));
} else if DISK_CHECKPOINT_FOR_WARN > disk.available_space() {
return Some(Ok(disk.available_space() / 1024 / 1024 / 1024));
}

break;
}
}
}
}

None
}
}

impl ServiceFactory<Self> for BlockConnectorService {
Expand All @@ -53,10 +109,16 @@ impl ServiceFactory<Self> for BlockConnectorService {
.get_startup_info()?
.ok_or_else(|| format_err!("Startup info should exist."))?;
let vm_metrics = ctx.get_shared_opt::<VMMetrics>()?;
let chain_service =
WriteBlockChainService::new(config, startup_info, storage, txpool, bus, vm_metrics)?;
let chain_service = WriteBlockChainService::new(
config.clone(),
startup_info,
storage,
txpool,
bus,
vm_metrics,
)?;

Ok(Self::new(chain_service))
Ok(Self::new(chain_service, config))
}
}

Expand All @@ -66,6 +128,11 @@ impl ActorService for BlockConnectorService {
ctx.set_mailbox_capacity(1024);
ctx.subscribe::<SyncStatusChangeEvent>();
ctx.subscribe::<MinedBlock>();

ctx.run_interval(std::time::Duration::from_secs(3), move |ctx| {
ctx.notify(crate::tasks::BlockDiskCheckEvent {});
});

Ok(())
}

Expand All @@ -76,6 +143,26 @@ impl ActorService for BlockConnectorService {
}
}

impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
fn handle_event(
&mut self,
_: BlockDiskCheckEvent,
ctx: &mut ServiceContext<BlockConnectorService>,
) {
if let Some(res) = self.check_disk_space() {
match res {
Ok(available_space) => {
warn!("Available diskspace only {}/GB left ", available_space)
}
Err(e) => {
error!("{}", e);
ctx.broadcast(SystemShutdown);
}
}
}
}
}

impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
fn handle_event(
&mut self,
Expand All @@ -84,6 +171,7 @@ impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected block error: {:?}", e);
Expand Down
3 changes: 3 additions & 0 deletions sync/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ pub struct BlockConnectedEvent {
pub block: Block,
}

#[derive(Clone, Debug)]
pub struct BlockDiskCheckEvent {}

pub trait BlockConnectedEventHandle: Send + Clone + std::marker::Unpin {
fn handle(&mut self, event: BlockConnectedEvent) -> Result<()>;
}
Expand Down
3 changes: 3 additions & 0 deletions types/src/system_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct MinedBlock(pub Arc<Block>);
#[derive(Clone, Debug)]
pub struct SystemStarted;

#[derive(Clone, Debug)]
pub struct SystemShutdown;

#[derive(Clone, Debug)]
pub struct SyncStatusChangeEvent(pub SyncStatus);

Expand Down

0 comments on commit f3c9687

Please sign in to comment.