Skip to content

Commit

Permalink
Observability: Instrument syncing (near#6800)
Browse files Browse the repository at this point in the history
Issue near#6788
  • Loading branch information
nikurt authored May 13, 2022
1 parent fa097a5 commit 72d27f6
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
6 changes: 6 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ impl Chain {
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_data_pre_state_sync").entered();
let head = self.head()?;
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
Expand Down Expand Up @@ -1283,6 +1284,7 @@ impl Chain {
orphan_misses_chunks: &mut dyn FnMut(OrphanMissingChunks),
on_challenge: &mut dyn FnMut(ChallengeBody),
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "reset_heads_post_state_sync").entered();
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
let hash = *header.prev_hash();
Expand Down Expand Up @@ -2278,6 +2280,7 @@ impl Chain {
sync_hash: CryptoHash,
apply_result: Result<(), near_chain_primitives::Error>,
) -> Result<(), Error> {
let _span = tracing::debug_span!(target: "sync", "set_state_finalize").entered();
apply_result?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
Expand Down Expand Up @@ -4968,6 +4971,8 @@ impl<'a> ChainUpdate<'a> {
sync_hash: CryptoHash,
shard_state_header: ShardStateSyncResponseHeader,
) -> Result<(), Error> {
let _span =
tracing::debug_span!(target: "sync", "chain_update_set_state_finalize").entered();
let (chunk, incoming_receipts_proofs) = match shard_state_header {
ShardStateSyncResponseHeader::V1(shard_state_header) => (
ShardChunk::V1(shard_state_header.chunk),
Expand Down Expand Up @@ -5074,6 +5079,7 @@ impl<'a> ChainUpdate<'a> {
shard_id: ShardId,
sync_hash: CryptoHash,
) -> Result<bool, Error> {
let _span = tracing::debug_span!(target: "sync", "set_state_finalize_on_height").entered();
let block_header_result =
self.chain_store_update.get_header_on_chain_by_height(&sync_hash, height);
if let Err(_) = block_header_result {
Expand Down
14 changes: 14 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,7 @@ impl ClientActor {
/// Runs itself iff it was not ran as reaction for message with results of
/// finishing state part job
fn sync(&mut self, ctx: &mut Context<ClientActor>) {
let _span = tracing::debug_span!(target: "client", "sync").entered();
let _d = delay_detector::DelayDetector::new(|| "client sync".into());
// Macro to schedule to call this function later if error occurred.
macro_rules! unwrap_or_run_later (($obj: expr) => (match $obj {
Expand Down Expand Up @@ -1923,6 +1924,7 @@ impl ClientActor {

impl Drop for ClientActor {
fn drop(&mut self) {
let _span = tracing::debug_span!(target: "client", "drop").entered();
self.state_parts_client_arbiter.stop();
}
}
Expand All @@ -1938,6 +1940,7 @@ impl SyncJobsActor {
&mut self,
msg: &ApplyStatePartsRequest,
) -> Result<(), near_chain_primitives::error::Error> {
let _span = tracing::debug_span!(target: "client", "apply_parts").entered();
let store = msg.runtime.get_store();

for part_id in 0..msg.num_parts {
Expand Down Expand Up @@ -1965,6 +1968,9 @@ impl Handler<ApplyStatePartsRequest> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsRequest")
.entered();
let result = self.apply_parts(&msg);

self.client_addr.do_send(ApplyStatePartsResponse {
Expand All @@ -1979,6 +1985,9 @@ impl Handler<ApplyStatePartsResponse> for ClientActor {
type Result = ();

fn handle(&mut self, msg: ApplyStatePartsResponse, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "ApplyStatePartsResponse")
.entered();
if let Some((sync, _, _)) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) {
// We are doing catchup
sync.set_apply_result(msg.shard_id, msg.apply_result);
Expand All @@ -1992,6 +2001,9 @@ impl Handler<BlockCatchUpRequest> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: BlockCatchUpRequest, _: &mut Self::Context) -> Self::Result {
let _span =
tracing::debug_span!(target: "client", "handle", handler = "BlockCatchUpRequest")
.entered();
let results = do_apply_chunks(msg.work);

self.client_addr.do_send(BlockCatchUpResponse {
Expand Down Expand Up @@ -2026,6 +2038,8 @@ impl Handler<StateSplitRequest> for SyncJobsActor {
type Result = ();

fn handle(&mut self, msg: StateSplitRequest, _: &mut Self::Context) -> Self::Result {
let _span = tracing::debug_span!(target: "client", "handle", handler = "StateSplitRequest")
.entered();
let results = msg.runtime.build_state_for_split_shards(
msg.shard_uid,
&msg.state_root,
Expand Down
5 changes: 4 additions & 1 deletion chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl HeaderSync {
highest_height: BlockHeight,
highest_height_peers: &Vec<FullPeerInfo>,
) -> Result<(), near_chain::Error> {
let _span = tracing::debug_span!(target: "sync", "run", sync = "HeaderSync").entered();
let header_head = chain.header_head()?;
if !self.header_sync_due(sync_status, &header_head, highest_height) {
return Ok(());
Expand Down Expand Up @@ -427,6 +428,7 @@ impl BlockSync {
highest_height: BlockHeight,
highest_height_peers: &[FullPeerInfo],
) -> Result<bool, near_chain::Error> {
let _span = tracing::debug_span!(target: "sync", "run", sync = "BlockSync").entered();
if self.block_sync_due(chain)? {
if self.block_sync(chain, highest_height_peers)? {
debug!(target: "sync", "Sync: transition to State Sync.");
Expand Down Expand Up @@ -1178,7 +1180,8 @@ impl StateSync {
state_parts_task_scheduler: &dyn Fn(ApplyStatePartsRequest),
state_split_scheduler: &dyn Fn(StateSplitRequest),
) -> Result<StateSyncResult, near_chain::Error> {
debug!(target:"sync", "syncing state sync_hash {:?} new_shard_sync {:?} tracking_shards {:?}", sync_hash, new_shard_sync, tracking_shards);
let _span = tracing::debug_span!(target: "sync", "run", sync = "StateSync").entered();
debug!(target: "sync", %sync_hash, ?new_shard_sync, ?tracking_shards, "syncing state");
let prev_hash = *chain.get_block_header(&sync_hash)?.prev_hash();
let now = Clock::utc();

Expand Down

0 comments on commit 72d27f6

Please sign in to comment.