Skip to content

Commit

Permalink
feat(en): Start health checks early into EN lifecycle (#1146)
Browse files Browse the repository at this point in the history
## What ❔

- Implements shared thread-safe container for health checks,
`AppHealthCheck`.
- Refactors EN initialization to start the healthcheck server early into
the node lifecycle.
- Adds a healthcheck for snapshot recovery.
- Uses healthchecks in the snapshot recovery integration test.

## Why ❔

This increases EN observability.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Feb 21, 2024
1 parent 975f54b commit f983e80
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 304 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 8 additions & 16 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use anyhow::Context as _;
use zksync_basic_types::{L1BatchNumber, L2ChainId};
use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::ConnectionPool;
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierOutcome};
use zksync_snapshots_applier::SnapshotsApplierConfig;
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

use crate::config::read_snapshots_recovery_config;
Expand All @@ -21,6 +22,7 @@ enum InitDecision {
pub(crate) async fn ensure_storage_initialized(
pool: &ConnectionPool,
main_node_client: &HttpClient,
app_health: &AppHealthCheck,
l2_chain_id: L2ChainId,
consider_snapshot_recovery: bool,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -83,24 +85,14 @@ pub(crate) async fn ensure_storage_initialized(
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
.create_store()
.await;
let outcome = SnapshotsApplierConfig::default()

let config = SnapshotsApplierConfig::default();
app_health.insert_component(config.health_check());
config
.run(pool, main_node_client, &blob_store)
.await
.context("snapshot recovery failed")?;
match outcome {
SnapshotsApplierOutcome::Ok => {
tracing::info!("Snapshot recovery is complete");
}
SnapshotsApplierOutcome::NoSnapshotsOnMainNode => {
anyhow::bail!("No snapshots on main node; snapshot recovery is impossible");
}
SnapshotsApplierOutcome::InitializedWithoutSnapshot => {
anyhow::bail!(
"Node contains a non-genesis L1 batch, but no genesis; snapshot recovery is unsafe. \
This should never occur unless the node DB was manually tampered with"
);
}
}
tracing::info!("Snapshot recovery is complete");
}
}
Ok(())
Expand Down
109 changes: 56 additions & 53 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ use zksync_core::{
},
};
use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck};
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_state::PostgresStorageCaches;
use zksync_storage::RocksDB;
use zksync_utils::wait_for_tasks::wait_for_tasks;
use zksync_web3_decl::jsonrpsee::http_client::HttpClient;

use crate::{
config::{observability::observability_config_from_env, ExternalNodeConfig},
Expand Down Expand Up @@ -111,11 +112,11 @@ async fn build_state_keeper(
async fn init_tasks(
config: &ExternalNodeConfig,
connection_pool: ConnectionPool,
main_node_client: HttpClient,
task_handles: &mut Vec<task::JoinHandle<anyhow::Result<()>>>,
app_health: &AppHealthCheck,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<(
Vec<task::JoinHandle<anyhow::Result<()>>>,
Vec<Box<dyn CheckHealth>>,
)> {
) -> anyhow::Result<()> {
let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST)
.expect("release manifest is a valid json document; qed");
let release_manifest_version = release_manifest["core"].as_str().expect(
Expand All @@ -124,19 +125,13 @@ async fn init_tasks(

let version = semver::Version::parse(release_manifest_version)
.expect("version in manifest is a correct semver format; qed");
let main_node_url = config
.required
.main_node_url()
.expect("Main node URL is incorrect");
let mut healthchecks: Vec<Box<dyn CheckHealth>> = Vec::new();
// Create components.
let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(&main_node_url));
let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(main_node_client.clone()));

let sync_state = SyncState::default();
healthchecks.push(Box::new(sync_state.clone()));
app_health.insert_custom_component(Arc::new(sync_state.clone()));
let (action_queue_sender, action_queue) = ActionQueue::new();

let mut task_handles = vec![];
let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(
connection_pool.clone(),
config.optional.miniblock_seal_queue_capacity,
Expand Down Expand Up @@ -173,22 +168,15 @@ async fn init_tasks(
)
.await?;

let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;
healthchecks.push(Box::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)));
let singleton_pool_builder = ConnectionPool::singleton(&config.postgres.database_url);

let fetcher_handle = if let Some(cfg) = config.consensus.clone() {
let pool = connection_pool.clone();
let mut stop_receiver = stop_receiver.clone();
let sync_state = sync_state.clone();
let main_node_client = main_node_client.clone();

#[allow(clippy::redundant_locals)]
tokio::spawn(async move {
let sync_state = sync_state;
let main_node_client = main_node_client;
scope::run!(&ctx::root(), |ctx, s| async {
s.spawn_bg(async {
let res = cfg.run(ctx, pool, action_queue_sender).await;
Expand Down Expand Up @@ -216,7 +204,7 @@ async fn init_tasks(
let mut storage = pool.access_storage_tagged("sync_layer").await?;
let fetcher = MainNodeFetcher::new(
&mut storage,
Box::new(main_node_client),
Box::new(main_node_client.clone()),
action_queue_sender,
sync_state.clone(),
stop_receiver.clone(),
Expand All @@ -239,7 +227,7 @@ async fn init_tasks(
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None)
.await
.context("failed initializing metadata calculator")?;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));
app_health.insert_component(metadata_calculator.tree_health_check());

let consistency_checker = ConsistencyChecker::new(
&config
Expand All @@ -252,18 +240,17 @@ async fn init_tasks(
.await
.context("failed to build connection pool for ConsistencyChecker")?,
);
healthchecks.push(Box::new(consistency_checker.health_check().clone()));
app_health.insert_component(consistency_checker.health_check().clone());
let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone()));

let batch_status_updater = BatchStatusUpdater::new(
&main_node_url,
main_node_client.clone(),
singleton_pool_builder
.build()
.await
.context("failed to build a connection pool for BatchStatusUpdater")?,
)
.context("failed initializing batch status updater")?;
healthchecks.push(Box::new(batch_status_updater.health_check()));
);
app_health.insert_component(batch_status_updater.health_check());

// Run the components.
let tree_stop_receiver = stop_receiver.clone();
Expand All @@ -278,7 +265,7 @@ async fn init_tasks(
.await
.context("failed to build a commitment_generator_pool")?;
let commitment_generator = CommitmentGenerator::new(commitment_generator_pool);
healthchecks.push(Box::new(commitment_generator.health_check()));
app_health.insert_component(commitment_generator.health_check());
let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone()));

let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone()));
Expand All @@ -292,7 +279,7 @@ async fn init_tasks(
let tx_sender_builder =
TxSenderBuilder::new(config.clone().into(), connection_pool.clone())
.with_main_connection_pool(connection_pool.clone())
.with_tx_proxy(&main_node_url);
.with_tx_proxy(main_node_client);

if config.optional.transactions_per_sec_limit.is_some() {
tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored");
Expand Down Expand Up @@ -352,14 +339,13 @@ async fn init_tasks(
.await
.context("Failed initializing WS JSON-RPC server")?;

healthchecks.push(Box::new(ws_server_handles.health_check));
healthchecks.push(Box::new(http_server_handles.health_check));
healthchecks.push(Box::new(ConnectionPoolHealthCheck::new(connection_pool)));
app_health.insert_component(ws_server_handles.health_check);
app_health.insert_component(http_server_handles.health_check);

if let Some(port) = config.optional.prometheus_port {
let (prometheus_health_check, prometheus_health_updater) =
ReactiveHealthCheck::new("prometheus_exporter");
healthchecks.push(Box::new(prometheus_health_check));
app_health.insert_component(prometheus_health_check);
task_handles.push(tokio::spawn(async move {
prometheus_health_updater.update(HealthStatus::Ready.into());
let result = PrometheusExporterConfig::pull(port)
Expand All @@ -384,7 +370,7 @@ async fn init_tasks(
commitment_generator_handle,
]);

Ok((task_handles, healthchecks))
Ok(())
}

async fn shutdown_components(
Expand Down Expand Up @@ -462,13 +448,6 @@ async fn main() -> anyhow::Result<()> {
Some(config::read_consensus_config().context("read_consensus_config()")?);
}

let main_node_url = config
.required
.main_node_url()
.context("Main node URL is incorrect")?;
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;

if let Some(threshold) = config.optional.slow_query_threshold() {
ConnectionPool::global_config().set_slow_query_threshold(threshold)?;
}
Expand Down Expand Up @@ -518,29 +497,53 @@ async fn main() -> anyhow::Result<()> {
let sigint_receiver = setup_sigint_handler();
tracing::warn!("The external node is in the alpha phase, and should be used with caution.");
tracing::info!("Started the external node");
tracing::info!("Main node URL is: {}", main_node_url);
let main_node_url = config
.required
.main_node_url()
.expect("Main node URL is incorrect");
tracing::info!("Main node URL is: {main_node_url}");

let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;
let app_health = Arc::new(AppHealthCheck::default());
app_health.insert_custom_component(Arc::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)));
app_health.insert_custom_component(Arc::new(ConnectionPoolHealthCheck::new(
connection_pool.clone(),
)));

// Start the health check server early into the node lifecycle so that its health can be monitored from the very start.
let healthcheck_handle = HealthCheckHandle::spawn_server(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
app_health.clone(),
);

// Make sure that the node storage is initialized either via genesis or snapshot recovery.
ensure_storage_initialized(
&connection_pool,
&main_node_client,
&app_health,
config.remote.l2_chain_id,
opt.enable_snapshots_recovery,
)
.await?;

let (stop_sender, stop_receiver) = watch::channel(false);
let (task_handles, mut healthchecks) =
init_tasks(&config, connection_pool.clone(), stop_receiver.clone())
.await
.context("init_tasks")?;
let mut task_handles = vec![];
init_tasks(
&config,
connection_pool.clone(),
main_node_client.clone(),
&mut task_handles,
&app_health,
stop_receiver.clone(),
)
.await
.context("init_tasks")?;

let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone());
healthchecks.push(Box::new(reorg_detector.health_check().clone()));
let healthcheck_handle = HealthCheckHandle::spawn_server(
([0, 0, 0, 0], config.required.healthcheck_port).into(),
healthchecks,
);
let reorg_detector = ReorgDetector::new(main_node_client, connection_pool.clone());
app_health.insert_component(reorg_detector.health_check().clone());
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run(stop_receiver)).fuse();
let mut reorg_detector_result = None;

Expand Down
Loading

0 comments on commit f983e80

Please sign in to comment.