diff --git a/src/burnchains/bitcoin/blocks.rs b/src/burnchains/bitcoin/blocks.rs index 0c585e320b..61a1820847 100644 --- a/src/burnchains/bitcoin/blocks.rs +++ b/src/burnchains/bitcoin/blocks.rs @@ -72,6 +72,10 @@ impl BurnHeaderIPC for BitcoinHeaderIPC { fn height(&self) -> u64 { self.block_height } + + fn header_hash(&self) -> [u8; 32] { + self.block_header.header.bitcoin_hash().0 + } } #[derive(Debug, Clone, PartialEq)] diff --git a/src/burnchains/burnchain.rs b/src/burnchains/burnchain.rs index eaf2c31f4f..a0ecf45cf6 100644 --- a/src/burnchains/burnchain.rs +++ b/src/burnchains/burnchain.rs @@ -17,6 +17,9 @@ along with Blockstack. If not, see . */ +use deps; +use deps::bitcoin::util::hash::Sha256dHash as BitcoinSha256dHash; + use std::fs; use std::path::PathBuf; use std::sync::mpsc::sync_channel; @@ -43,7 +46,7 @@ use burnchains::{ use burnchains::db::BurnchainDB; use burnchains::indexer::{ - BurnBlockIPC, BurnchainBlockDownloader, BurnchainBlockParser, BurnchainIndexer, + BurnBlockIPC, BurnHeaderIPC, BurnchainBlockDownloader, BurnchainBlockParser, BurnchainIndexer, }; use burnchains::bitcoin::address::address_type_to_version_byte; @@ -791,9 +794,16 @@ impl Burnchain { pub fn sync( &mut self, comms: &CoordinatorChannels, + target_block_height_opt: Option, + max_blocks_opt: Option, ) -> Result { let mut indexer: I = self.make_indexer()?; - let chain_tip = self.sync_with_indexer(&mut indexer, comms.clone())?; + let chain_tip = self.sync_with_indexer( + &mut indexer, + comms.clone(), + target_block_height_opt, + max_blocks_opt, + )?; Ok(chain_tip.block_height) } @@ -1006,12 +1016,16 @@ impl Burnchain { } /// Top-level burnchain sync. - /// Returns the burnchain block header for the new burnchain tip + /// Returns the burnchain block header for the new burnchain tip, which will be _at least_ as + /// high as target_block_height_opt (if given), or whatever is currently at the tip of the + /// burnchain DB. /// If this method returns Err(burnchain_error::TrySyncAgain), then call this method again. pub fn sync_with_indexer( &mut self, indexer: &mut I, coord_comm: CoordinatorChannels, + target_block_height_opt: Option, + max_blocks_opt: Option, ) -> Result where I: BurnchainIndexer + 'static, @@ -1040,11 +1054,9 @@ impl Burnchain { // get latest headers. debug!("Sync headers from {}", sync_height); - let end_block = indexer.sync_headers(sync_height, None)?; - let mut start_block = match sync_height { - 0 => 0, - _ => sync_height, - }; + // fetch all headers, no matter what + let mut end_block = indexer.sync_headers(sync_height, None)?; + let mut start_block = sync_height; if db_height < start_block { start_block = db_height; } @@ -1053,6 +1065,41 @@ impl Burnchain { "Sync'ed headers from {} to {}. DB at {}", start_block, end_block, db_height ); + + if let Some(target_block_height) = target_block_height_opt { + if target_block_height < end_block { + debug!( + "Will download up to max burn block height {}", + target_block_height + ); + end_block = target_block_height; + } + } + + if let Some(max_blocks) = max_blocks_opt { + if start_block + max_blocks < end_block { + debug!( + "Will download only {} blocks (up to block height {})", + max_blocks, + start_block + max_blocks + ); + end_block = start_block + max_blocks; + } + } + + if end_block < start_block { + // nothing to do -- go get the burnchain block data at that height + let mut hdrs = indexer.read_headers(end_block, end_block + 1)?; + if let Some(hdr) = hdrs.pop() { + debug!("Nothing to do; already have blocks up to {}", end_block); + let bhh = + BurnchainHeaderHash::from_bitcoin_hash(&BitcoinSha256dHash(hdr.header_hash())); + return burnchain_db + .get_burnchain_block(&bhh) + .map(|block_data| block_data.header); + } + } + if start_block == db_height && db_height == end_block { // all caught up return Ok(burn_chain_tip); diff --git a/src/burnchains/db.rs b/src/burnchains/db.rs index d835775c9e..116c5d03c7 100644 --- a/src/burnchains/db.rs +++ b/src/burnchains/db.rs @@ -12,7 +12,10 @@ use chainstate::burn::operations::BlockstackOperationType; use chainstate::stacks::index::MarfTrieId; -use util::db::{query_row, query_rows, u64_to_sql, Error as DBError, FromColumn, FromRow}; +use util::db::{ + query_row, query_rows, tx_begin_immediate, tx_busy_handler, u64_to_sql, Error as DBError, + FromColumn, FromRow, +}; pub struct BurnchainDB { conn: Connection, @@ -185,10 +188,12 @@ impl BurnchainDB { } }; - let mut db = BurnchainDB { - conn: Connection::open_with_flags(path, open_flags) - .expect(&format!("FAILED to open: {}", path)), - }; + let conn = Connection::open_with_flags(path, open_flags) + .expect(&format!("FAILED to open: {}", path)); + + conn.busy_handler(Some(tx_busy_handler))?; + + let mut db = BurnchainDB { conn }; if create_flag { let db_tx = db.tx_begin()?; @@ -216,14 +221,14 @@ impl BurnchainDB { OpenFlags::SQLITE_OPEN_READ_ONLY }; let conn = Connection::open_with_flags(path, open_flags)?; + conn.busy_handler(Some(tx_busy_handler))?; Ok(BurnchainDB { conn }) } fn tx_begin<'a>(&'a mut self) -> Result, BurnchainError> { - Ok(BurnchainDBTransaction { - sql_tx: self.conn.transaction()?, - }) + let sql_tx = tx_begin_immediate(&mut self.conn)?; + Ok(BurnchainDBTransaction { sql_tx: sql_tx }) } pub fn get_canonical_chain_tip(&self) -> Result { diff --git a/src/burnchains/indexer.rs b/src/burnchains/indexer.rs index c17cf759d1..7b11e9dcf4 100644 --- a/src/burnchains/indexer.rs +++ b/src/burnchains/indexer.rs @@ -28,6 +28,7 @@ pub trait BurnHeaderIPC { fn height(&self) -> u64; fn header(&self) -> Self::H; + fn header_hash(&self) -> [u8; 32]; } pub trait BurnBlockIPC { diff --git a/src/chainstate/burn/operations/mod.rs b/src/chainstate/burn/operations/mod.rs index 8a25504a93..27552e9078 100644 --- a/src/chainstate/burn/operations/mod.rs +++ b/src/chainstate/burn/operations/mod.rs @@ -141,7 +141,7 @@ impl From for Error { #[derive(Debug, PartialEq, Clone, Eq, Serialize, Deserialize)] pub struct LeaderBlockCommitOp { - pub block_header_hash: BlockHeaderHash, // hash of Stacks block header (double-sha256) + pub block_header_hash: BlockHeaderHash, // hash of Stacks block header (sha512/256) pub new_seed: VRFSeed, // new seed for this block pub parent_block_ptr: u32, // block height of the block that contains the parent block hash diff --git a/src/chainstate/coordinator/mod.rs b/src/chainstate/coordinator/mod.rs index df5dd88166..ca9e6ba68d 100644 --- a/src/chainstate/coordinator/mod.rs +++ b/src/chainstate/coordinator/mod.rs @@ -432,8 +432,8 @@ impl<'a, T: BlockEventDispatcher, N: CoordinatorNotices, U: RewardSetProvider> self.notifier.notify_sortition_processed(); debug!( - "Sortition processed: {} (tip {})", - &sortition_id, &next_snapshot.burn_header_hash + "Sortition processed: {} (tip {} height {})", + &sortition_id, &next_snapshot.burn_header_hash, next_snapshot.block_height ); if sortition_tip_snapshot.block_height < header.block_height { diff --git a/src/chainstate/coordinator/tests.rs b/src/chainstate/coordinator/tests.rs index 3d428495d7..d6347a1b45 100644 --- a/src/chainstate/coordinator/tests.rs +++ b/src/chainstate/coordinator/tests.rs @@ -1782,6 +1782,7 @@ fn preprocess_block( &my_sortition.consensus_hash, &block, &parent_consensus_hash, + 5, ) .unwrap(); } diff --git a/src/chainstate/stacks/boot/mod.rs b/src/chainstate/stacks/boot/mod.rs index 89bf410f48..e18a296b8f 100644 --- a/src/chainstate/stacks/boot/mod.rs +++ b/src/chainstate/stacks/boot/mod.rs @@ -3202,7 +3202,7 @@ pub mod test { }) .unwrap(); - eprintln!("\ntenure: {}\nreward cycle: {}\nmin-uSTX: {}\naddrs: {:?}\ntotal_liquid_ustx: {}\ntotal-stacked: {}\ntotal-stacked next: {}\n", + eprintln!("\ntenure: {}\nreward cycle: {}\nmin-uSTX: {}\naddrs: {:?}\ntotal_liquid_ustx: {}\ntotal-stacked: {}\ntotal-stacked next: {}\n", tenure_id, cur_reward_cycle, min_ustx, &reward_addrs, total_liquid_ustx, total_stacked, total_stacked_next); if tenure_id <= 1 { diff --git a/src/chainstate/stacks/db/blocks.rs b/src/chainstate/stacks/db/blocks.rs index 0c7453c37c..b78d53e0aa 100644 --- a/src/chainstate/stacks/db/blocks.rs +++ b/src/chainstate/stacks/db/blocks.rs @@ -47,8 +47,8 @@ use std::path::{Path, PathBuf}; use util::db::Error as db_error; use util::db::{ - query_count, query_int, query_row, query_row_columns, query_rows, tx_busy_handler, DBConn, - FromColumn, FromRow, + query_count, query_int, query_row, query_row_columns, query_row_panic, query_rows, + tx_busy_handler, DBConn, FromColumn, FromRow, }; use util::db::u64_to_sql; @@ -431,7 +431,7 @@ const STACKS_BLOCK_INDEX_SQL: &'static [&'static str] = &[ CREATE TABLE staging_blocks(anchored_block_hash TEXT NOT NULL, parent_anchored_block_hash TEXT NOT NULL, consensus_hash TEXT NOT NULL, - -- parent_consensus_hash is the consensus hash of the parent sortition of the sortition that chose this block + -- parent_consensus_hash is the consensus hash of the parent sortition of the sortition that chose this block parent_consensus_hash TEXT NOT NULL, parent_microblock_hash TEXT NOT NULL, parent_microblock_seq INT NOT NULL, @@ -443,6 +443,9 @@ const STACKS_BLOCK_INDEX_SQL: &'static [&'static str] = &[ commit_burn INT NOT NULL, sortition_burn INT NOT NULL, index_block_hash TEXT NOT NULL, -- used internally; hash of burn header and block header + download_time INT NOT NULL, -- how long the block was in-flight + arrival_time INT NOT NULL, -- when this block was stored + processed_time INT NOT NULL, -- when this block was processed PRIMARY KEY(anchored_block_hash,consensus_hash) ); CREATE INDEX processed_stacks_blocks ON staging_blocks(processed,anchored_blcok_hash,consensus_hash); @@ -1373,6 +1376,7 @@ impl StacksChainState { parent_consensus_hash: &ConsensusHash, commit_burn: u64, sortition_burn: u64, + download_time: u64, ) -> Result<(), Error> { debug!( "Store anchored block {}/{}, parent in {}", @@ -1428,8 +1432,11 @@ impl StacksChainState { orphaned, \ commit_burn, \ sortition_burn, \ - index_block_hash) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + index_block_hash, \ + arrival_time, \ + processed_time, \ + download_time) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"; let args: &[&dyn ToSql] = &[ &block_hash, &block.header.parent_block, @@ -1445,6 +1452,9 @@ impl StacksChainState { &u64_to_sql(commit_burn)?, &u64_to_sql(sortition_burn)?, &index_block_hash, + &u64_to_sql(get_epoch_time_secs())?, + &0, + &u64_to_sql(download_time)?, ]; tx.execute(&sql, args) @@ -1870,40 +1880,38 @@ impl StacksChainState { anchored_block_hash: &BlockHeaderHash, ) -> Result<(), Error> { // This block is orphaned - let update_block_sql = "UPDATE staging_blocks SET orphaned = 1, processed = 1, attachable = 0 WHERE anchored_block_hash = ?1".to_string(); - let update_block_args = [&anchored_block_hash]; + let update_block_sql = "UPDATE staging_blocks SET orphaned = 1, processed = 1, attachable = 0 WHERE consensus_hash = ?1 AND anchored_block_hash = ?2".to_string(); + let update_block_args: &[&dyn ToSql] = &[consensus_hash, anchored_block_hash]; // All descendents of this processed block are never attachable. // Indicate this by marking all children as orphaned (but not procesed), across all burnchain forks. - let update_children_sql = "UPDATE staging_blocks SET orphaned = 1, processed = 0, attachable = 0 WHERE parent_anchored_block_hash = ?1".to_string(); - let update_children_args = [&anchored_block_hash]; + let update_children_sql = "UPDATE staging_blocks SET orphaned = 1, processed = 0, attachable = 0 WHERE parent_consensus_hash = ?1 AND parent_anchored_block_hash = ?2".to_string(); + let update_children_args: &[&dyn ToSql] = &[consensus_hash, anchored_block_hash]; // find all orphaned microblocks, and delete the block data - let find_orphaned_microblocks_sql = - "SELECT microblock_hash FROM staging_microblocks WHERE anchored_block_hash = ?1" - .to_string(); - let find_orphaned_microblocks_args = [&anchored_block_hash]; + let find_orphaned_microblocks_sql = "SELECT microblock_hash FROM staging_microblocks WHERE consensus_hash = ?1 AND anchored_block_hash = ?2".to_string(); + let find_orphaned_microblocks_args: &[&dyn ToSql] = &[consensus_hash, anchored_block_hash]; let orphaned_microblock_hashes = query_row_columns::( tx, &find_orphaned_microblocks_sql, - &find_orphaned_microblocks_args, + find_orphaned_microblocks_args, "microblock_hash", ) .map_err(Error::DBError)?; // drop microblocks (this processes them) - let update_microblock_children_sql = "UPDATE staging_microblocks SET orphaned = 1, processed = 1 WHERE anchored_block_hash = ?1".to_string(); - let update_microblock_children_args = [&anchored_block_hash]; + let update_microblock_children_sql = "UPDATE staging_microblocks SET orphaned = 1, processed = 1 WHERE consensus_hash = ?1 AND anchored_block_hash = ?2".to_string(); + let update_microblock_children_args: &[&dyn ToSql] = &[consensus_hash, anchored_block_hash]; - tx.execute(&update_block_sql, &update_block_args) + tx.execute(&update_block_sql, update_block_args) .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; - tx.execute(&update_children_sql, &update_children_args) + tx.execute(&update_children_sql, update_children_args) .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; tx.execute( &update_microblock_children_sql, - &update_microblock_children_args, + update_microblock_children_args, ) .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; @@ -1936,6 +1944,7 @@ impl StacksChainState { /// Clear out a staging block -- mark it as processed. /// Mark its children as attachable. /// Idempotent. + /// sort_tx_opt is required if accept is true fn set_block_processed<'a, 'b>( tx: &mut BlocksDBTx<'a>, mut sort_tx_opt: Option<&mut SortitionHandleTx<'b>>, @@ -2016,8 +2025,12 @@ impl StacksChainState { ); } - let update_sql = "UPDATE staging_blocks SET processed = 1 WHERE consensus_hash = ?1 AND anchored_block_hash = ?2".to_string(); - let update_args: &[&dyn ToSql] = &[&consensus_hash, &anchored_block_hash]; + let update_sql = "UPDATE staging_blocks SET processed = 1, processed_time = ?1 WHERE consensus_hash = ?2 AND anchored_block_hash = ?3".to_string(); + let update_args: &[&dyn ToSql] = &[ + &u64_to_sql(get_epoch_time_secs())?, + &consensus_hash, + &anchored_block_hash, + ]; tx.execute(&update_sql, update_args) .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; @@ -2972,6 +2985,7 @@ impl StacksChainState { consensus_hash: &ConsensusHash, block: &StacksBlock, parent_consensus_hash: &ConsensusHash, + download_time: u64, ) -> Result { debug!( "preprocess anchored block {}/{}", @@ -3067,6 +3081,7 @@ impl StacksChainState { parent_consensus_hash, commit_burn, sortition_burn, + download_time, )?; // store users who burned for this block so they'll get rewarded if we process it @@ -3222,6 +3237,7 @@ impl StacksChainState { &snapshot.consensus_hash, block, &parent_sn.consensus_hash, + 5, )?; let block_hash = block.block_hash(); for mblock in microblocks.iter() { @@ -3380,152 +3396,243 @@ impl StacksChainState { Ok(true) } - /// Is there at least one staging block that can be attached? - pub fn has_attachable_staging_blocks(blocks_conn: &DBConn) -> Result { - // go through staging blocks and see if any of them match headers and are attachable. - // pick randomly -- don't allow the network sender to choose the processing order! - let sql = "SELECT 1 FROM staging_blocks WHERE processed = 0 AND attachable = 1 AND orphaned = 0 LIMIT 1".to_string(); - let available = blocks_conn - .query_row(&sql, NO_PARAMS, |_row| ()) - .optional() - .map_err(|e| Error::DBError(db_error::SqliteError(e)))? - .is_some(); - Ok(available) + /// How many attachable staging blocks do we have, up to a limit, at or after the given + /// timestamp? + pub fn count_attachable_staging_blocks( + blocks_conn: &DBConn, + limit: u64, + min_arrival_time: u64, + ) -> Result { + let sql = "SELECT COUNT(*) FROM staging_blocks WHERE processed = 0 AND attachable = 1 AND orphaned = 0 AND arrival_time >= ?1 LIMIT ?2".to_string(); + let cnt = query_count( + blocks_conn, + &sql, + &[&u64_to_sql(min_arrival_time)?, &u64_to_sql(limit)?], + ) + .map_err(Error::DBError)?; + Ok(cnt as u64) + } + + /// How many processed staging blocks do we have, up to a limit, at or after the given + /// timestamp? + pub fn count_processed_staging_blocks( + blocks_conn: &DBConn, + limit: u64, + min_arrival_time: u64, + ) -> Result { + let sql = "SELECT COUNT(*) FROM staging_blocks WHERE processed = 1 AND orphaned = 0 AND processed_time > 0 AND processed_time >= ?1 LIMIT ?2".to_string(); + let cnt = query_count( + blocks_conn, + &sql, + &[&u64_to_sql(min_arrival_time)?, &u64_to_sql(limit)?], + ) + .map_err(Error::DBError)?; + Ok(cnt as u64) + } + + /// Measure how long a block waited in-between when it arrived and when it got processed. + /// Includes both orphaned and accepted blocks. + pub fn measure_block_wait_time( + blocks_conn: &DBConn, + start_height: u64, + end_height: u64, + ) -> Result, Error> { + let sql = "SELECT processed_time - arrival_time FROM staging_blocks WHERE processed = 1 AND height >= ?1 AND height < ?2"; + let args: &[&dyn ToSql] = &[&u64_to_sql(start_height)?, &u64_to_sql(end_height)?]; + let list = query_rows::(blocks_conn, &sql, args)?; + Ok(list) + } + + /// Measure how long a block took to be downloaded (for blocks that we downloaded). + /// Includes _all_ blocks. + pub fn measure_block_download_time( + blocks_conn: &DBConn, + start_height: u64, + end_height: u64, + ) -> Result, Error> { + let sql = "SELECT download_time FROM staging_blocks WHERE height >= ?1 AND height < ?2"; + let args: &[&dyn ToSql] = &[&u64_to_sql(start_height)?, &u64_to_sql(end_height)?]; + let list = query_rows::(blocks_conn, &sql, args)?; + Ok(list) } /// Given access to the chain state (headers) and the staging blocks, find a staging block we /// can process, as well as its parent microblocks that it confirms /// Returns Some(microblocks, staging block) if we found a sequence of blocks to process. /// Returns None if not. - fn find_next_staging_block( - blocks_conn: &DBConn, + fn find_next_staging_block<'a>( + blocks_tx: &mut BlocksDBTx<'a>, blocks_path: &String, headers_conn: &DBConn, + sort_conn: &DBConn, ) -> Result, StagingBlock)>, Error> { test_debug!("Find next staging block"); - // go through staging blocks and see if any of them match headers and are attachable. - // pick randomly -- don't allow the network sender to choose the processing order! - let sql = "SELECT * FROM staging_blocks WHERE processed = 0 AND attachable = 1 AND orphaned = 0 ORDER BY RANDOM()".to_string(); + let mut to_delete = vec![]; - let mut stmt = blocks_conn - .prepare(&sql) - .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; - - let mut rows = stmt - .query(NO_PARAMS) - .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; - - while let Some(row_res) = rows.next() { - match row_res { - Ok(row) => { - let candidate = StagingBlock::from_row(&row).map_err(Error::DBError)?; + // put this in a block so stmt goes out of scope before we start to delete PoX-orphaned + // blocks + { + // go through staging blocks and see if any of them match headers, are attachable, and are + // recent (i.e. less than 10 minutes old) + // pick randomly -- don't allow the network sender to choose the processing order! + let sql = "SELECT * FROM staging_blocks WHERE processed = 0 AND attachable = 1 AND orphaned = 0 ORDER BY RANDOM()".to_string(); + let mut stmt = blocks_tx + .prepare(&sql) + .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; - debug!( - "Consider block {}/{} whose parent is {}/{}", - &candidate.consensus_hash, - &candidate.anchored_block_hash, - &candidate.parent_consensus_hash, - &candidate.parent_anchored_block_hash - ); + let mut rows = stmt + .query(NO_PARAMS) + .map_err(|e| Error::DBError(db_error::SqliteError(e)))?; - let can_attach = { - if candidate.parent_anchored_block_hash == FIRST_STACKS_BLOCK_HASH { - // this block's parent is the boot code -- it's the first-ever block, - // so it can be processed immediately - true - } else { - // not the first-ever block. Does this connect to a previously-accepted - // block in the headers database? - let hdr_sql = "SELECT * FROM block_headers WHERE block_hash = ?1 AND consensus_hash = ?2".to_string(); - let hdr_args: &[&dyn ToSql] = &[ - &candidate.parent_anchored_block_hash, - &candidate.parent_consensus_hash, - ]; - let hdr_rows = - query_rows::(headers_conn, &hdr_sql, hdr_args) - .map_err(Error::DBError)?; - - match hdr_rows.len() { - 0 => { - // no parent processed for this block - debug!( - "No such parent {}/{} for block, cannot process", - &candidate.parent_consensus_hash, - &candidate.parent_anchored_block_hash - ); - false - } - 1 => { - // can process this block - debug!( - "Have parent {}/{} for this block, will process", - &candidate.parent_consensus_hash, - &candidate.parent_anchored_block_hash - ); - true - } - _ => { - // should be impossible -- stored the same block twice - unreachable!( - "Stored the same block twice: {}/{}", - &candidate.parent_anchored_block_hash, - &candidate.parent_consensus_hash - ); - } - } - } - }; + while let Some(row_res) = rows.next() { + match row_res { + Ok(row) => { + let mut candidate = StagingBlock::from_row(&row).map_err(Error::DBError)?; - if can_attach { - // try and load up this staging block and its microblocks - match StacksChainState::load_staging_block( - blocks_conn, - blocks_path, + debug!( + "Consider block {}/{} whose parent is {}/{}", &candidate.consensus_hash, &candidate.anchored_block_hash, - )? { - Some(staging_block) => { - // must be unprocessed -- must have a block - if staging_block.block_data.len() == 0 { - return Err(Error::NetError(net_error::DeserializeError( - format!( - "No block data for staging block {}", - candidate.anchored_block_hash - ), - ))); - } + &candidate.parent_consensus_hash, + &candidate.parent_anchored_block_hash + ); - // find its microblock parent stream - match StacksChainState::find_parent_staging_microblock_stream( - blocks_conn, - blocks_path, - &staging_block, - )? { - Some(parent_staging_microblocks) => { - return Ok(Some(( - parent_staging_microblocks, - staging_block, - ))); + let can_attach = { + if candidate.parent_anchored_block_hash == FIRST_STACKS_BLOCK_HASH { + // this block's parent is the boot code -- it's the first-ever block, + // so it can be processed immediately + true + } else { + // not the first-ever block. Does this connect to a previously-accepted + // block in the headers database? + let hdr_sql = "SELECT * FROM block_headers WHERE block_hash = ?1 AND consensus_hash = ?2".to_string(); + let hdr_args: &[&dyn ToSql] = &[ + &candidate.parent_anchored_block_hash, + &candidate.parent_consensus_hash, + ]; + let hdr_row = query_row_panic::( + headers_conn, + &hdr_sql, + hdr_args, + || { + format!( + "Stored the same block twice: {}/{}", + &candidate.parent_anchored_block_hash, + &candidate.parent_consensus_hash + ) + }, + )?; + match hdr_row { + Some(_) => { + debug!( + "Have parent {}/{} for this block, will process", + &candidate.parent_consensus_hash, + &candidate.parent_anchored_block_hash + ); + true } None => { - // no microblock data yet + // no parent processed for this block + debug!( + "No such parent {}/{} for block, cannot process", + &candidate.parent_consensus_hash, + &candidate.parent_anchored_block_hash + ); + false } } } - None => { - // should be impossible -- selected unprocessed blocks - unreachable!("Failed to load staging block when an earlier query indicated that it was present"); + }; + + if can_attach { + // load up the block data + candidate.block_data = match StacksChainState::load_block_bytes( + blocks_path, + &candidate.consensus_hash, + &candidate.anchored_block_hash, + )? { + Some(bytes) => { + if bytes.len() == 0 { + error!( + "CORRUPTION: No block data for {}/{}", + &candidate.consensus_hash, + &candidate.anchored_block_hash + ); + panic!(); + } + bytes + } + None => { + error!( + "CORRUPTION: No block data for {}/{}", + &candidate.consensus_hash, &candidate.anchored_block_hash + ); + panic!(); + } + }; + + // find its microblock parent stream + match StacksChainState::find_parent_staging_microblock_stream( + blocks_tx, + blocks_path, + &candidate, + )? { + Some(parent_staging_microblocks) => { + return Ok(Some((parent_staging_microblocks, candidate))); + } + None => { + // no microblock data yet, so we can't process this block + continue; + } + } + } else { + // this can happen if a PoX reorg happens + // if this candidate is no longer on the main PoX fork, then delete it + let sn_opt = SortitionDB::get_block_snapshot_consensus( + sort_conn, + &candidate.consensus_hash, + )?; + if sn_opt.is_none() { + to_delete.push(( + candidate.consensus_hash.clone(), + candidate.anchored_block_hash.clone(), + )); + } else if let Some(sn) = sn_opt { + if !sn.pox_valid { + to_delete.push(( + candidate.consensus_hash.clone(), + candidate.anchored_block_hash.clone(), + )); + } } } } - } - Err(e) => { - return Err(Error::DBError(db_error::SqliteError(e))); + Err(e) => { + return Err(Error::DBError(db_error::SqliteError(e))); + } } } } + for (consensus_hash, anchored_block_hash) in to_delete.into_iter() { + debug!("Orphan {}/{}: it does not connect to a previously-accepted block, because its consensus hash does not match an existing snapshot on the valid PoX fork.", &consensus_hash, &anchored_block_hash); + let _ = StacksChainState::set_block_processed( + blocks_tx, + None, + &consensus_hash, + &anchored_block_hash, + false, + ) + .map_err(|e| { + warn!( + "Failed to orphan {}/{}: {:?}", + &consensus_hash, &anchored_block_hash, &e + ); + e + }); + } + // no blocks available Ok(None) } @@ -3945,9 +4052,10 @@ impl StacksChainState { // this is a transaction against both the headers and staging blocks databases! let (mut next_microblocks, next_staging_block) = match StacksChainState::find_next_staging_block( - &chainstate_tx.blocks_tx, + &mut chainstate_tx.blocks_tx, &blocks_path, &chainstate_tx.headers_tx, + sort_tx, )? { Some((next_microblocks, next_staging_block)) => { (next_microblocks, next_staging_block) @@ -5075,6 +5183,7 @@ pub mod test { parent_consensus_hash, commit_burn, sortition_burn, + 5, ) .unwrap(); tx.commit().unwrap(); diff --git a/src/chainstate/stacks/index/storage.rs b/src/chainstate/stacks/index/storage.rs index fcb23c8563..fe625e1222 100644 --- a/src/chainstate/stacks/index/storage.rs +++ b/src/chainstate/stacks/index/storage.rs @@ -1653,19 +1653,19 @@ pub mod test { fn node_cmp(n1: &TrieNodeType, n2: &TrieNodeType) -> bool { match (n1, n2) { (TrieNodeType::Leaf(ref data1), TrieNodeType::Leaf(ref data2)) => { - (data1.path == data2.path && data1.data == data2.data) + data1.path == data2.path && data1.data == data2.data } (TrieNodeType::Node4(ref data1), TrieNodeType::Node4(ref data2)) => { - (data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs)) + data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs) } (TrieNodeType::Node16(ref data1), TrieNodeType::Node16(ref data2)) => { - (data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs)) + data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs) } (TrieNodeType::Node48(ref data1), TrieNodeType::Node48(ref data2)) => { - (data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs)) + data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs) } (TrieNodeType::Node256(ref data1), TrieNodeType::Node256(ref data2)) => { - (data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs)) + data1.path == data2.path && ptrs_cmp(&data1.ptrs, &data2.ptrs) } (_, _) => false, } diff --git a/src/chainstate/stacks/miner.rs b/src/chainstate/stacks/miner.rs index bd25879b34..75dd677d95 100644 --- a/src/chainstate/stacks/miner.rs +++ b/src/chainstate/stacks/miner.rs @@ -1788,6 +1788,7 @@ pub mod test { &commit_snapshot.consensus_hash, &stacks_block, &parent_block_consensus_hash, + 5, ) .unwrap(); @@ -4949,7 +4950,7 @@ pub mod test { microblocks.push(microblock); } - test_debug!("Produce anchored stacks block {} with smart contract and {} microblocks with contract call at burnchain height {} stacks height {}", + test_debug!("Produce anchored stacks block {} with smart contract and {} microblocks with contract call at burnchain height {} stacks height {}", stacks_block.block_hash(), microblocks.len(), burnchain_height, stacks_block.header.total_work.work); (stacks_block, microblocks) diff --git a/src/core/mod.rs b/src/core/mod.rs index 6c01838239..1eb8c509d4 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -81,7 +81,7 @@ pub fn sync_burnchain_bitcoin( e })?; - let new_height_res = burnchain.sync::(&channels.1); + let new_height_res = burnchain.sync::(&channels.1, None, None); let new_height = new_height_res.map_err(|e| { error!( "Failed to synchronize Bitcoin chain state for {} in {}", diff --git a/src/main.rs b/src/main.rs index 6db0ab4e90..ae8c2429ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -617,6 +617,7 @@ fn main() { &mut new_chainstate, &new_snapshot.consensus_hash, &stacks_block, + 0, ) .unwrap(); } else { diff --git a/src/net/chat.rs b/src/net/chat.rs index 936b84fcbb..e1b4b3abd6 100644 --- a/src/net/chat.rs +++ b/src/net/chat.rs @@ -664,17 +664,17 @@ impl ConversationP2P { ) -> Result { if msg.preamble.network_id != self.network_id { // not on our network - test_debug!( - "wrong network ID: {:x} != {:x}", - msg.preamble.network_id, - self.network_id + debug!( + "{:?}: Preamble invalid: wrong network ID: {:x} != {:x}", + &self, msg.preamble.network_id, self.network_id ); return Err(net_error::InvalidMessage); } if (msg.preamble.peer_version & 0xff000000) != (self.version & 0xff000000) { // major version mismatch test_debug!( - "wrong peer version: {:x} != {:x}", + "{:?}: Preamble invalid: wrong peer version: {:x} != {:x}", + &self, msg.preamble.peer_version, self.version ); @@ -687,8 +687,9 @@ impl ConversationP2P { != Some(msg.preamble.burn_block_height) { // invalid message - test_debug!( - "wrong stable block height: {:?} != {}", + debug!( + "{:?}: Preamble invalid: wrong stable block height: {:?} != {}", + &self, msg.preamble .burn_stable_block_height .checked_add(self.burnchain.stable_confirmations as u64), @@ -700,13 +701,11 @@ impl ConversationP2P { if msg.preamble.burn_stable_block_height > chain_view.burn_block_height + MAX_NEIGHBOR_BLOCK_DELAY { - // this node is too far ahead of us, but otherwise still potentially valid - test_debug!( - "remote peer is too far ahead of us: {} > {}", - msg.preamble.burn_stable_block_height, - chain_view.burn_block_height + // this node is too far ahead of us for neighbor walks, but otherwise still potentially valid + debug!( + "{:?}: remote peer is far ahead of us: {} > {}", + &self, msg.preamble.burn_stable_block_height, chain_view.burn_block_height ); - return Ok(false); } // must agree on stable burn header hash @@ -3912,8 +3911,8 @@ mod test { &vec![], ) .unwrap(); + let mut sortdb_1 = SortitionDB::connect_test(12300, &first_burn_hash).unwrap(); - let mut sortdb_2 = SortitionDB::connect_test(12300, &first_burn_hash).unwrap(); db_setup(&mut peerdb_1, &mut sortdb_1, &socketaddr_1, &chain_view); @@ -3965,40 +3964,6 @@ mod test { ); } - // node is too far ahead of us - { - let mut convo_bad = - ConversationP2P::new(123, 456, &burnchain, &socketaddr_2, &conn_opts, true, 0); - - let ping_data = PingData::new(); - - let mut chain_view_bad = chain_view.clone(); - chain_view_bad.burn_stable_block_height += - MAX_NEIGHBOR_BLOCK_DELAY + 1 + burnchain.stable_confirmations as u64; - chain_view_bad.burn_block_height += - MAX_NEIGHBOR_BLOCK_DELAY + 1 + burnchain.stable_confirmations as u64; - - let ping_bad = convo_bad - .sign_message( - &chain_view_bad, - &local_peer_1.private_key, - StacksMessageType::Ping(ping_data.clone()), - ) - .unwrap(); - - chain_view_bad.burn_stable_block_height -= - MAX_NEIGHBOR_BLOCK_DELAY + 1 + burnchain.stable_confirmations as u64; - chain_view_bad.burn_block_height -= - MAX_NEIGHBOR_BLOCK_DELAY + 1 + burnchain.stable_confirmations as u64; - - db_setup(&mut peerdb_1, &mut sortdb_2, &socketaddr_2, &chain_view_bad); - - assert_eq!( - convo_bad.is_preamble_valid(&ping_bad, &chain_view), - Ok(false) - ); - } - // unstable burn header hash mismatch { let mut convo_bad = diff --git a/src/net/connection.rs b/src/net/connection.rs index ad9077e9f4..1c6ca7653a 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -57,8 +57,6 @@ use net::neighbors::{ NEIGHBOR_REQUEST_TIMEOUT, NEIGHBOR_WALK_INTERVAL, NUM_INITIAL_WALKS, WALK_RETRY_COUNT, }; -use util::strings::UrlString; - use vm::{costs::ExecutionCost, types::BOUND_VALUE_SERIALIZATION_HEX}; use chainstate::burn::ConsensusHash; diff --git a/src/net/db.rs b/src/net/db.rs index eea54d5315..8ff49450c0 100644 --- a/src/net/db.rs +++ b/src/net/db.rs @@ -40,6 +40,8 @@ use util::macros::is_big_endian; use util::secp256k1::Secp256k1PrivateKey; use util::secp256k1::Secp256k1PublicKey; +use util::db::tx_busy_handler; + use chainstate::stacks::StacksPrivateKey; use chainstate::stacks::StacksPublicKey; @@ -528,6 +530,7 @@ impl PeerDB { let conn = Connection::open_with_flags(path, open_flags).map_err(|e| db_error::SqliteError(e))?; + conn.busy_handler(Some(tx_busy_handler))?; let mut db = PeerDB { conn: conn, readwrite: readwrite, diff --git a/src/net/download.rs b/src/net/download.rs index 3d040e00f6..431b18078e 100644 --- a/src/net/download.rs +++ b/src/net/download.rs @@ -102,6 +102,19 @@ pub const BLOCK_DOWNLOAD_INTERVAL: u64 = 180; #[cfg(test)] pub const BLOCK_DOWNLOAD_INTERVAL: u64 = 30; +/// If a URL never connects, don't use it again for this many seconds +#[cfg(not(test))] +pub const BLOCK_DOWNLOAD_BAN_URL: u64 = 300; +#[cfg(test)] +pub const BLOCK_DOWNLOAD_BAN_URL: u64 = 60; + +/// If we created a request to download a block or microblock, don't do so again until this many +/// seconds have passed. +#[cfg(not(test))] +pub const BLOCK_REREQUEST_INTERVAL: u64 = 60; +#[cfg(test)] +pub const BLOCK_REREQUEST_INTERVAL: u64 = 30; + /// This module is responsible for downloading blocks and microblocks from other peers, using block /// inventory state (see src/net/inv.rs) @@ -114,6 +127,7 @@ pub struct BlockRequestKey { pub index_block_hash: StacksBlockId, pub child_block_header: Option, // only used if asking for a microblock; used to confirm the stream's continuity pub sortition_height: u64, + pub download_start: u64, } impl BlockRequestKey { @@ -134,6 +148,7 @@ impl BlockRequestKey { index_block_hash: index_block_hash, child_block_header: child_block_header, sortition_height: sortition_height, + download_start: get_epoch_time_secs(), } } } @@ -198,8 +213,14 @@ pub struct BlockDownloader { broken_peers: Vec, broken_neighbors: Vec, // disconnect peers who report invalid block inventories too + blocked_urls: HashMap, // URLs that chronically don't work, and when we can try them again + /// how often to download download_interval: u64, + + /// when did we last request a given block hash + requested_blocks: HashMap, + requested_microblocks: HashMap, } impl BlockDownloader { @@ -240,12 +261,16 @@ impl BlockDownloader { dead_peers: vec![], broken_peers: vec![], broken_neighbors: vec![], + blocked_urls: HashMap::new(), download_interval: download_interval, + requested_blocks: HashMap::new(), + requested_microblocks: HashMap::new(), } } pub fn reset(&mut self) -> () { + debug!("Downloader reset"); self.state = BlockDownloaderState::DNSLookupBegin; self.dns_lookups.clear(); @@ -398,14 +423,14 @@ impl BlockDownloader { ); pending_block_requests.insert(block_key, event_id); } else { - debug!( - "Event {} ({:?}, {:?} for block {} failed to connect", - event_id, - &block_key.neighbor, - &block_key.data_url, - &block_key.index_block_hash - ); + debug!("Event {} ({:?}, {:?} for block {} failed to connect. Temporarily blocking URL", event_id, &block_key.neighbor, &block_key.data_url, &block_key.index_block_hash); self.dead_peers.push(event_id); + + // don't try this again for a while + self.blocked_urls.insert( + block_key.data_url, + get_epoch_time_secs() + BLOCK_DOWNLOAD_BAN_URL, + ); } } Some(ref mut convo) => { @@ -422,12 +447,12 @@ impl BlockDownloader { &block.block_hash(), ) != block_key.index_block_hash { - test_debug!("Invalid block from {:?} ({:?}): did not ask for block {}/{}", &block_key.neighbor, &block_key.data_url, block_key.consensus_hash, block.block_hash()); + info!("Invalid block from {:?} ({:?}): did not ask for block {}/{}", &block_key.neighbor, &block_key.data_url, block_key.consensus_hash, block.block_hash()); self.broken_peers.push(event_id); self.broken_neighbors.push(block_key.neighbor.clone()); } else { // got the block - test_debug!( + debug!( "Got block {}: {}/{}", &block_key.sortition_height, &block_key.consensus_hash, @@ -439,7 +464,7 @@ impl BlockDownloader { // TODO: redirect? HttpResponseType::NotFound(_, _) => { // remote peer didn't have the block - test_debug!("Remote neighbor {:?} ({:?}) does not actually have block {} indexed at {} ({})", &block_key.neighbor, &block_key.data_url, block_key.sortition_height, &block_key.index_block_hash, &block_key.consensus_hash); + info!("Remote neighbor {:?} ({:?}) does not actually have block {} indexed at {} ({})", &block_key.neighbor, &block_key.data_url, block_key.sortition_height, &block_key.index_block_hash, &block_key.consensus_hash); // the fact that we asked this peer means that it's block inv indicated // it was present, so the absence is the mark of a broken peer @@ -448,10 +473,9 @@ impl BlockDownloader { } _ => { // wrong message response - test_debug!( + info!( "Got bad HTTP response from {:?}: {:?}", - &block_key.data_url, - &http_response + &block_key.data_url, &http_response ); self.broken_peers.push(event_id); self.broken_neighbors.push(block_key.neighbor.clone()); @@ -505,6 +529,12 @@ impl BlockDownloader { event_id ); self.dead_peers.push(event_id); + + // don't try this again for a while + self.blocked_urls.insert( + block_key.data_url, + get_epoch_time_secs() + BLOCK_DOWNLOAD_BAN_URL, + ); } } Some(ref mut convo) => { @@ -518,12 +548,12 @@ impl BlockDownloader { HttpResponseType::Microblocks(_md, microblocks) => { if microblocks.len() == 0 { // we wouldn't have asked for a 0-length stream - test_debug!("Got unexpected zero-length microblock stream from {:?} ({:?})", &block_key.neighbor, &block_key.data_url); + info!("Got unexpected zero-length microblock stream from {:?} ({:?})", &block_key.neighbor, &block_key.data_url); self.broken_peers.push(event_id); self.broken_neighbors.push(block_key.neighbor.clone()); } else { // have microblocks (but we don't know yet if they're well-formed) - test_debug!( + debug!( "Got (tentative) microblocks {}: {}/{}-{}", block_key.sortition_height, &block_key.consensus_hash, @@ -537,7 +567,7 @@ impl BlockDownloader { HttpResponseType::NotFound(_, _) => { // remote peer didn't have the microblock, even though their blockinv said // they did. - test_debug!("Remote neighbor {:?} ({:?}) does not have microblock stream indexed at {}", &block_key.neighbor, &block_key.data_url, &block_key.index_block_hash); + info!("Remote neighbor {:?} ({:?}) does not have microblock stream indexed at {}", &block_key.neighbor, &block_key.data_url, &block_key.index_block_hash); // the fact that we asked this peer means that it's block inv indicated // it was present, so the absence is the mark of a broken peer @@ -546,7 +576,7 @@ impl BlockDownloader { } _ => { // wrong message response - test_debug!("Got bad HTTP response from {:?}", &block_key.data_url); + info!("Got bad HTTP response from {:?}", &block_key.data_url); self.broken_peers.push(event_id); self.broken_neighbors.push(block_key.neighbor.clone()); } @@ -614,6 +644,7 @@ impl BlockDownloader { } debug!("Begin headers load"); + let begin_ts = get_epoch_time_ms(); let last_ancestor = SortitionDB::get_ancestor_snapshot( &ic, first_block_height + sortition_height_end, @@ -642,7 +673,8 @@ impl BlockDownloader { _block_hash_opt ); } - debug!("End headers load"); + let end_ts = get_epoch_time_ms(); + debug!("End headers load ({} ms)", end_ts.saturating_sub(begin_ts)); // update cache SortitionDB::merge_block_header_cache(header_cache, &local_blocks); @@ -819,6 +851,43 @@ impl BlockDownloader { pub fn is_download_idle(&self) -> bool { self.empty_block_download_passes > 0 && self.empty_microblock_download_passes > 0 } + + /// Is a request in-flight for a given block or microblock stream? + fn is_inflight(&self, index_hash: &StacksBlockId, microblocks: bool) -> bool { + if microblocks { + // being requested now? + for (_, reqs) in self.microblocks_to_try.iter() { + if reqs.len() > 0 { + if reqs[0].index_block_hash == *index_hash { + return true; + } + } + } + + // was recently requested? could still be bufferred up for storage + if let Some(fetched_ts) = self.requested_blocks.get(index_hash) { + if get_epoch_time_secs() < fetched_ts + BLOCK_REREQUEST_INTERVAL { + return true; + } + } + } else { + for (_, reqs) in self.blocks_to_try.iter() { + if reqs.len() > 0 { + if reqs[0].index_block_hash == *index_hash { + return true; + } + } + } + + // was recently requested? could still be bufferred up for storage + if let Some(fetched_ts) = self.requested_microblocks.get(index_hash) { + if get_epoch_time_secs() < fetched_ts + BLOCK_REREQUEST_INTERVAL { + return true; + } + } + } + return false; + } } impl PeerNetwork { @@ -863,12 +932,63 @@ impl PeerNetwork { } } + /// Do we need to download an anchored block? + /// already have an anchored block? + fn need_anchored_block( + _local_peer: &LocalPeer, + chainstate: &StacksChainState, + consensus_hash: &ConsensusHash, + block_hash: &BlockHeaderHash, + ) -> Result { + // already in queue or already processed? + let index_block_hash = StacksBlockHeader::make_index_block_hash(consensus_hash, block_hash); + if StacksChainState::has_stored_block( + &chainstate.blocks_db, + &chainstate.blocks_path, + consensus_hash, + block_hash, + )? { + test_debug!( + "{:?}: Block already stored and processed: {}/{} ({})", + _local_peer, + consensus_hash, + block_hash, + &index_block_hash + ); + return Ok(false); + } else if StacksChainState::has_staging_block( + &chainstate.blocks_db, + consensus_hash, + block_hash, + )? { + test_debug!( + "{:?}: Block already stored (but not processed): {}/{} ({})", + _local_peer, + consensus_hash, + block_hash, + &index_block_hash + ); + return Ok(false); + } else if StacksChainState::has_block_indexed(&chainstate.blocks_path, &index_block_hash)? { + test_debug!( + "{:?}: Block already stored to chunk store: {}/{} ({})", + _local_peer, + consensus_hash, + block_hash, + &index_block_hash + ); + return Ok(false); + } + Ok(true) + } + /// Create block request keys for a range of blocks that are available but that we don't have in a given range of /// sortitions. The same keys can be used to fetch confirmed microblock streams. fn make_requests( &mut self, sortdb: &SortitionDB, chainstate: &StacksChainState, + downloader: &BlockDownloader, start_sortition_height: u64, microblocks: bool, ) -> Result>, net_error> { @@ -927,15 +1047,29 @@ impl PeerNetwork { } }; + let mut child_block_header = None; let index_block_hash = StacksBlockHeader::make_index_block_hash(&consensus_hash, &block_hash); - let mut child_block_header = None; + if downloader.is_inflight(&index_block_hash, microblocks) { + // we already asked for this block or microblock stream + test_debug!( + "{:?}: Already in-flight: {}/{}", + &self.local_peer, + &consensus_hash, + &block_hash + ); + continue; + } let (target_consensus_hash, target_block_hash) = if !microblocks { // asking for a block - if StacksChainState::has_block_indexed(&chainstate.blocks_path, &index_block_hash)? - { - // we already have this block + if !PeerNetwork::need_anchored_block( + &self.local_peer, + chainstate, + &consensus_hash, + &block_hash, + )? { + // we already have this block stored to disk test_debug!( "{:?}: Already have anchored block {}/{}", &self.local_peer, @@ -1101,6 +1235,34 @@ impl PeerNetwork { continue; } + let prev_blocked = if let Some(deadline) = downloader.blocked_urls.get(&data_url) { + if get_epoch_time_secs() < *deadline { + debug!( + "{:?}: Will not request {} {}/{} from {:?} (of {:?}) until after {}", + &self.local_peer, + if microblocks { + "microblock stream" + } else { + "anchored block" + }, + &target_consensus_hash, + &target_block_hash, + &data_url, + &nk, + deadline + ); + true + } else { + false + } + } else { + false + }; + + if prev_blocked { + continue; + } + test_debug!( "{:?}: Make request for {} at sortition height {} to {:?}: {:?}/{:?}", &self.local_peer, @@ -1138,9 +1300,16 @@ impl PeerNetwork { &mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, + downloader: &BlockDownloader, start_sortition_height: u64, ) -> Result>, net_error> { - self.make_requests(sortdb, chainstate, start_sortition_height, false) + self.make_requests( + sortdb, + chainstate, + downloader, + start_sortition_height, + false, + ) } /// Make requests for missing confirmed microblocks @@ -1148,9 +1317,10 @@ impl PeerNetwork { &mut self, sortdb: &SortitionDB, chainstate: &mut StacksChainState, + downloader: &BlockDownloader, start_sortition_height: u64, ) -> Result>, net_error> { - self.make_requests(sortdb, chainstate, start_sortition_height, true) + self.make_requests(sortdb, chainstate, downloader, start_sortition_height, true) } /// Prioritize block requests -- ask for the rarest blocks first @@ -1214,6 +1384,7 @@ impl PeerNetwork { let mut next_blocks_to_try = network.make_block_requests( sortdb, chainstate, + downloader, next_block_sortition_height, )?; @@ -1224,6 +1395,7 @@ impl PeerNetwork { let mut next_microblocks_to_try = network.make_confirmed_microblock_requests( sortdb, chainstate, + downloader, next_microblock_sortition_height, )?; @@ -1265,6 +1437,8 @@ impl PeerNetwork { &network.local_peer, &max_height, &max_mblock_height ); + let now = get_epoch_time_secs(); + // queue up block requests in order by sortition height while height <= max_height && (downloader.blocks_to_try.len() as u64) @@ -1288,19 +1462,34 @@ impl PeerNetwork { height += 1; continue; } - assert_eq!(height, requests.front().as_ref().unwrap().sortition_height); + let index_block_hash = + requests.front().as_ref().unwrap().index_block_hash.clone(); + if let Some(deadline) = downloader.requested_blocks.get(&index_block_hash) { + if now < *deadline { + debug!( + "{:?}: already inflight: {}", + &network.local_peer, &index_block_hash + ); + height += 1; + continue; + } + } + debug!( "{:?}: will request anchored block for sortition {}: {}/{} ({})", &network.local_peer, height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, - &requests.front().as_ref().unwrap().index_block_hash + &index_block_hash ); downloader.blocks_to_try.insert(height, requests); + downloader + .requested_blocks + .insert(index_block_hash, now + BLOCK_REREQUEST_INTERVAL); height += 1; } @@ -1337,12 +1526,30 @@ impl PeerNetwork { requests.front().as_ref().unwrap().sortition_height ); + let index_block_hash = + requests.front().as_ref().unwrap().index_block_hash.clone(); + if let Some(deadline) = + downloader.requested_microblocks.get(&index_block_hash) + { + if now < *deadline { + debug!( + "{:?}: already inflight: {}", + &network.local_peer, &index_block_hash + ); + height += 1; + continue; + } + } + debug!("{:?}: will request microblock stream produced by sortition {}: {}/{} ({})", - &network.local_peer, mblock_height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &requests.front().as_ref().unwrap().index_block_hash); + &network.local_peer, mblock_height, &requests.front().as_ref().unwrap().consensus_hash, &requests.front().as_ref().unwrap().anchor_block_hash, &index_block_hash); downloader .microblocks_to_try .insert(mblock_height, requests); + downloader + .requested_microblocks + .insert(index_block_hash, now + BLOCK_REREQUEST_INTERVAL); mblock_height += 1; } @@ -1663,8 +1870,8 @@ impl PeerNetwork { ( bool, Option, - Vec<(ConsensusHash, StacksBlock)>, - Vec<(ConsensusHash, Vec)>, + Vec<(ConsensusHash, StacksBlock, u64)>, + Vec<(ConsensusHash, Vec, u64)>, ), net_error, > { @@ -1673,6 +1880,8 @@ impl PeerNetwork { let mut done = false; let mut old_pox_id = None; + let now = get_epoch_time_secs(); + PeerNetwork::with_downloader_state(self, |ref mut network, ref mut downloader| { // extract blocks and microblocks downloaded for (request_key, block) in downloader.blocks.drain() { @@ -1683,7 +1892,11 @@ impl PeerNetwork { &request_key.index_block_hash, request_key.sortition_height ); - blocks.push((request_key.consensus_hash.clone(), block)); + blocks.push(( + request_key.consensus_hash.clone(), + block, + now.saturating_sub(request_key.download_start), + )); downloader.num_blocks_downloaded += 1; // don't try this again @@ -1723,7 +1936,11 @@ impl PeerNetwork { &request_key.anchor_block_hash, request_key.sortition_height ); - microblocks.push((request_key.consensus_hash.clone(), microblock_stream)); + microblocks.push(( + request_key.consensus_hash.clone(), + microblock_stream, + now.saturating_sub(request_key.download_start), + )); downloader.num_microblocks_downloaded += 1; } else { // stream is not well-formed @@ -1841,10 +2058,14 @@ impl PeerNetwork { format!("Empty block requests at height {}", height) ); debug!( - " Height {}: anchored block {} available from {} peers", + " Height {}: anchored block {} available from {} peers: {:?}", height, requests.front().unwrap().index_block_hash, - requests.len() + requests.len(), + requests + .iter() + .map(|r| r.data_url.clone()) + .collect::>() ); } for (height, requests) in downloader.microblocks_to_try.iter() { @@ -1853,10 +2074,14 @@ impl PeerNetwork { format!("Empty microblock requests at height {}", height) ); debug!( - " Height {}: microblocks {} available from {} peers", + " Height {}: microblocks {} available from {} peers: {:?}", height, requests.front().unwrap().index_block_hash, - requests.len() + requests.len(), + requests + .iter() + .map(|r| r.data_url.clone()) + .collect::>() ); } @@ -1889,8 +2114,8 @@ impl PeerNetwork { ( bool, Option, - Vec<(ConsensusHash, StacksBlock)>, - Vec<(ConsensusHash, Vec)>, + Vec<(ConsensusHash, StacksBlock, u64)>, + Vec<(ConsensusHash, Vec, u64)>, Vec, Vec, ), @@ -2033,6 +2258,7 @@ pub mod test { use net::test::*; use net::*; use std::collections::HashMap; + use util::sleep_ms; use util::test::*; fn get_peer_availability( @@ -2108,16 +2334,6 @@ pub mod test { ) .unwrap(); block_data.push((sn.consensus_hash.clone(), stacks_block, microblocks)); - - /* - let (burn_ops, stacks_block, microblocks) = peer_2.make_default_tenure(); - peer_1.next_burnchain_block(burn_ops.clone()); - peer_2.next_burnchain_block(burn_ops.clone()); - peer_2.process_stacks_epoch_at_tip(&stacks_block, µblocks); - - let sn = SortitionDB::get_canonical_burn_chain_tip(&peer_2.sortdb.as_ref().unwrap().conn()).unwrap(); - block_data.push((sn.consensus_hash.clone(), stacks_block, microblocks)); - */ } let num_burn_blocks = { @@ -2892,4 +3108,97 @@ pub mod test { ); }) } + + #[test] + #[ignore] + #[should_panic(expected = "blocked URL")] + pub fn test_get_blocks_and_microblocks_ban_url() { + use std::convert::TryFrom; + use std::net::TcpListener; + use std::thread; + + let listener_1 = TcpListener::bind("127.0.0.1:3260").unwrap(); + let listener_2 = TcpListener::bind("127.0.0.1:3262").unwrap(); + + let endpoint_thread_1 = thread::spawn(move || { + let (sock, addr) = listener_1.accept().unwrap(); + test_debug!("Accepted 1 {:?}", &addr); + sleep_ms(60_000); + }); + + let endpoint_thread_2 = thread::spawn(move || { + let (sock, addr) = listener_2.accept().unwrap(); + test_debug!("Accepted 2 {:?}", &addr); + sleep_ms(60_000); + }); + + run_get_blocks_and_microblocks( + "test_get_blocks_and_microblocks_ban_url", + 3250, + 2, + |ref mut peer_configs| { + // build initial network topology + assert_eq!(peer_configs.len(), 2); + + peer_configs[0].connection_opts.disable_block_advertisement = true; + peer_configs[1].connection_opts.disable_block_advertisement = true; + + // announce URLs to our fake handlers + peer_configs[0].data_url = + UrlString::try_from("http://127.0.0.1:3260".to_string()).unwrap(); + peer_configs[1].data_url = + UrlString::try_from("http://127.0.0.1:3262".to_string()).unwrap(); + + let peer_0 = peer_configs[0].to_neighbor(); + let peer_1 = peer_configs[1].to_neighbor(); + peer_configs[0].add_neighbor(&peer_1); + peer_configs[1].add_neighbor(&peer_0); + }, + |num_blocks, ref mut peers| { + // build up block data to replicate + let mut block_data = vec![]; + for _ in 0..num_blocks { + let (mut burn_ops, stacks_block, microblocks) = peers[1].make_default_tenure(); + + let (_, burn_header_hash, consensus_hash) = + peers[1].next_burnchain_block(burn_ops.clone()); + peers[1].process_stacks_epoch_at_tip(&stacks_block, µblocks); + + TestPeer::set_ops_burn_header_hash(&mut burn_ops, &burn_header_hash); + + peers[0].next_burnchain_block_raw(burn_ops); + + let sn = SortitionDB::get_canonical_burn_chain_tip( + &peers[1].sortdb.as_ref().unwrap().conn(), + ) + .unwrap(); + block_data.push(( + sn.consensus_hash.clone(), + Some(stacks_block), + Some(microblocks), + )); + } + block_data + }, + |_| {}, + |peer| { + let mut blocked = 0; + match peer.network.block_downloader { + Some(ref dl) => { + blocked = dl.blocked_urls.len(); + } + None => {} + } + if blocked >= 1 { + // NOTE: this is the success criterion + panic!("blocked URL"); + } + true + }, + |_| true, + ); + + endpoint_thread_1.join().unwrap(); + endpoint_thread_2.join().unwrap(); + } } diff --git a/src/net/inv.rs b/src/net/inv.rs index 1dcff2e4f3..2d038f24f9 100644 --- a/src/net/inv.rs +++ b/src/net/inv.rs @@ -1336,7 +1336,7 @@ impl PeerNetwork { if target_pox_reward_cycle + GETPOXINV_MAX_BITLEN <= max_reward_cycle { GETPOXINV_MAX_BITLEN } else { - max_reward_cycle - target_pox_reward_cycle + 1 + cmp::max(1, max_reward_cycle - target_pox_reward_cycle) }; if num_reward_cycles == 0 { @@ -2001,6 +2001,10 @@ impl PeerNetwork { if stats.learned_data { // update hints + debug!( + "{:?}: learned something new from {:?}", + &network.local_peer, &nk + ); inv_state.hint_learned_data = inv_state.hint_learned_data || stats.learned_data; inv_state.last_change_at = get_epoch_time_secs(); diff --git a/src/net/mod.rs b/src/net/mod.rs index eaef2e6edc..3e4e39de19 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1595,8 +1595,8 @@ pub const DENY_MIN_BAN_DURATION: u64 = 2; pub struct NetworkResult { pub download_pox_id: Option, // PoX ID as it was when we begin downloading blocks (set if we have downloaded new blocks) pub unhandled_messages: HashMap>, - pub blocks: Vec<(ConsensusHash, StacksBlock)>, // blocks we downloaded - pub confirmed_microblocks: Vec<(ConsensusHash, Vec)>, // confiremd microblocks we downloaded + pub blocks: Vec<(ConsensusHash, StacksBlock, u64)>, // blocks we downloaded, and time taken + pub confirmed_microblocks: Vec<(ConsensusHash, Vec, u64)>, // confiremd microblocks we downloaded, and time taken pub pushed_transactions: HashMap, StacksTransaction)>>, // all transactions pushed to us and their message relay hints pub pushed_blocks: HashMap>, // all blocks pushed to us pub pushed_microblocks: HashMap, MicroblocksData)>>, // all microblocks pushed to us, and the relay hints from the message @@ -2580,6 +2580,7 @@ pub mod test { &sn.consensus_hash, block, &parent_sn.consensus_hash, + 5, ) .map_err(|e| format!("Failed to preprocess anchored block: {:?}", &e)) }; @@ -2704,6 +2705,7 @@ pub mod test { &mut node.chainstate, consensus_hash, block, + 0, ) .unwrap(); diff --git a/src/net/neighbors.rs b/src/net/neighbors.rs index f51f2ca292..e691faccb9 100644 --- a/src/net/neighbors.rs +++ b/src/net/neighbors.rs @@ -541,7 +541,7 @@ impl NeighborWalk { let res = if neighbor_from_handshake.addr != self.cur_neighbor.addr { // somehow, got a handshake from someone that _isn't_ cur_neighbor - debug!("{:?}: got unsolicited (or bootstrapping) HandshakeAccept from outbound {:?} (expected {:?})", + debug!("{:?}: got unsolicited (or bootstrapping) HandshakeAccept from outbound {:?} (expected {:?})", &self.local_peer, &neighbor_from_handshake.addr, &self.cur_neighbor.addr); @@ -956,7 +956,7 @@ impl NeighborWalk { let rh_naddr = naddr.clone(); // used below let new_rh = match res { Ok(message) => { - // if the neighbor is still bootstrapping, we're doone + // if the neighbor is still bootstrapping, we're done if message.preamble.burn_stable_block_height + MAX_NEIGHBOR_BLOCK_DELAY < stable_block_height { diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 928affb9bb..d0f1c2ac5c 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -229,7 +229,7 @@ pub struct PeerNetwork { handles: VecDeque, // network I/O - network: Option, + pub network: Option, p2p_network_handle: usize, http_network_handle: usize, @@ -2261,14 +2261,14 @@ impl PeerNetwork { let mut block_set = HashSet::new(); let mut microblock_set = HashSet::new(); - for (_, block) in network_result.blocks.iter() { + for (_, block, _) in network_result.blocks.iter() { if block_set.contains(&block.block_hash()) { test_debug!("Duplicate block {}", block.block_hash()); } block_set.insert(block.block_hash()); } - for (_, mblocks) in network_result.confirmed_microblocks.iter() { + for (_, mblocks, _) in network_result.confirmed_microblocks.iter() { for mblock in mblocks.iter() { if microblock_set.contains(&mblock.block_hash()) { test_debug!("Duplicate microblock {}", mblock.block_hash()); diff --git a/src/net/relay.rs b/src/net/relay.rs index b126b7b815..c8d3827422 100644 --- a/src/net/relay.rs +++ b/src/net/relay.rs @@ -494,6 +494,7 @@ impl Relayer { chainstate: &mut StacksChainState, consensus_hash: &ConsensusHash, block: &StacksBlock, + download_time: u64, ) -> Result { // find the snapshot of the parent of this block let db_handle = SortitionHandleConn::open_reader_consensus(sort_ic, consensus_hash)?; @@ -527,6 +528,7 @@ impl Relayer { consensus_hash, block, &parent_block_snapshot.consensus_hash, + download_time, ) } @@ -617,8 +619,14 @@ impl Relayer { ) -> HashSet { let mut new_blocks = HashSet::new(); - for (consensus_hash, block) in network_result.blocks.iter() { - match Relayer::process_new_anchored_block(sort_ic, chainstate, consensus_hash, block) { + for (consensus_hash, block, download_time) in network_result.blocks.iter() { + match Relayer::process_new_anchored_block( + sort_ic, + chainstate, + consensus_hash, + block, + *download_time, + ) { Ok(accepted) => { if accepted { new_blocks.insert((*consensus_hash).clone()); @@ -700,6 +708,7 @@ impl Relayer { chainstate, &consensus_hash, block, + 0, ) { Ok(accepted) => { if accepted { @@ -739,7 +748,9 @@ impl Relayer { chainstate: &mut StacksChainState, ) -> HashSet { let mut ret = HashSet::new(); - for (consensus_hash, microblock_stream) in network_result.confirmed_microblocks.iter() { + for (consensus_hash, microblock_stream, _download_time) in + network_result.confirmed_microblocks.iter() + { if microblock_stream.len() == 0 { continue; } @@ -1031,7 +1042,8 @@ impl Relayer { ) -> bool { let txid = tx.txid(); if mempool.has_tx(&txid) { - return true; + debug!("Already have tx {}", txid); + return false; } if let Err(e) = mempool.submit(consensus_hash, block_hash, tx) { diff --git a/src/net/server.rs b/src/net/server.rs index 02bb4b504d..f08c04d724 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -295,24 +295,19 @@ impl HttpPeer { /// Deregister a socket/event pair pub fn deregister_http(&mut self, network_state: &mut NetworkState, event_id: usize) -> () { - if self.peers.contains_key(&event_id) { - // kill the conversation - self.peers.remove(&event_id); - } + self.peers.remove(&event_id); - let mut to_remove: Vec = vec![]; - match self.sockets.get_mut(&event_id) { + match self.sockets.remove(&event_id) { None => {} - Some(ref sock) => { - let _ = network_state.deregister(event_id, sock); - to_remove.push(event_id); // force it to close anyway + Some(sock) => { + let _ = network_state.deregister(event_id, &sock); } } - - for event_id in to_remove { - // remove socket - self.sockets.remove(&event_id); - self.connecting.remove(&event_id); + match self.connecting.remove(&event_id) { + None => {} + Some((sock, ..)) => { + let _ = network_state.deregister(event_id, &sock); + } } } @@ -788,9 +783,9 @@ mod test { client_sleep: u64, mut make_request: F, check_result: C, - ) -> () + ) -> usize where - F: FnMut(usize, &mut TestPeer) -> Vec, + F: FnMut(usize, &mut StacksChainState) -> Vec, C: Fn(usize, Result, net_error>) -> bool, { let mut peer_config = TestPeerConfig::new(test_name, peer_p2p, peer_http); @@ -800,14 +795,10 @@ mod test { let view = peer.get_burnchain_view().unwrap(); let (http_sx, http_rx) = sync_channel(1); - let mut client_requests = vec![]; - let mut client_threads = vec![]; - let mut client_handles = vec![]; - for i in 0..num_clients { - let request = make_request(i, &mut peer); - client_requests.push(request); - } + let network_id = peer.config.network_id; + let chainstate_path = peer.chainstate_path.clone(); + let (num_events_sx, num_events_rx) = sync_channel(1); let http_thread = thread::spawn(move || { let view = peer.get_burnchain_view().unwrap(); loop { @@ -825,8 +816,20 @@ mod test { } test_debug!("http server joined"); + let num_events = peer.network.network.as_ref().unwrap().num_events(); + let _ = num_events_sx.send(num_events); }); + let mut client_requests = vec![]; + let mut client_threads = vec![]; + let mut client_handles = vec![]; + let (mut chainstate, _) = + StacksChainState::open(false, network_id, &chainstate_path).unwrap(); + for i in 0..num_clients { + let request = make_request(i, &mut chainstate); + client_requests.push(request); + } + for (i, request) in client_requests.drain(..).enumerate() { let (client_sx, client_rx) = sync_channel(1); let client = thread::spawn(move || { @@ -881,7 +884,9 @@ mod test { } http_sx.send(true).unwrap(); + let num_events = num_events_rx.recv().unwrap(); http_thread.join().unwrap(); + num_events } #[test] @@ -950,7 +955,7 @@ mod test { ConnectionOptions::default(), 1, 0, - |client_id, ref mut peer_server| { + |client_id, ref mut chainstate| { let peer_server_block = make_codec_test_block(25); let peer_server_consensus_hash = ConsensusHash([(client_id + 1) as u8; 20]); let index_block_hash = StacksBlockHeader::make_index_block_hash( @@ -960,7 +965,7 @@ mod test { test_debug!("Store peer server index block {:?}", &index_block_hash); store_staging_block( - peer_server.chainstate(), + chainstate, &peer_server_consensus_hash, &peer_server_block, &ConsensusHash([client_id as u8; 20]), @@ -1014,7 +1019,7 @@ mod test { ConnectionOptions::default(), 10, 0, - |client_id, ref mut peer_server| { + |client_id, ref mut chainstate| { let peer_server_block = make_codec_test_block(25); let peer_server_consensus_hash = ConsensusHash([(client_id + 1) as u8; 20]); let index_block_hash = StacksBlockHeader::make_index_block_hash( @@ -1024,7 +1029,7 @@ mod test { test_debug!("Store peer server index block {:?}", &index_block_hash); store_staging_block( - peer_server.chainstate(), + chainstate, &peer_server_consensus_hash, &peer_server_block, &ConsensusHash([client_id as u8; 20]), @@ -1172,7 +1177,7 @@ mod test { conn_opts, 1, 0, - |client_id, ref mut peer| { + |client_id, ref mut chainstate| { // make a gigantic transaction let mut big_contract_parts = vec![]; let mut total_len = 0; @@ -1202,7 +1207,7 @@ mod test { .unwrap(), ); - tx_contract.chain_id = peer.config.network_id; + tx_contract.chain_id = chainstate.config().chain_id; tx_contract.set_fee_rate(0); let mut signer = StacksTransactionSigner::new(&tx_contract); @@ -1288,6 +1293,43 @@ mod test { ); } + #[test] + fn test_http_no_connecting_event_id_leak() { + use std::net::TcpListener; + + let mut conn_opts = ConnectionOptions::default(); + conn_opts.timeout = 10; + conn_opts.connect_timeout = 10; + + let num_events = test_http_server( + "test_http_no_connecting_event_id_leak", + 51082, + 51083, + conn_opts, + 1, + 0, + |client_id, _| { + // open a socket and just sit there + use std::net::TcpStream; + let sock = TcpStream::connect("127.0.0.1:51083"); + + sleep_ms(15_000); + + // send a different request + let mut request = HttpRequestType::GetInfo(HttpRequestMetadata::from_host( + PeerHost::from_host_port("127.0.0.1".to_string(), 51083), + )); + request.metadata_mut().keep_alive = false; + + let request_bytes = StacksHttp::serialize_request(&request).unwrap(); + request_bytes + }, + |client_id, res| true, + ); + + assert_eq!(num_events, 2); + } + #[test] fn test_http_noop() { if std::env::var("BLOCKSTACK_HTTP_TEST") != Ok("1".to_string()) { @@ -1305,7 +1347,7 @@ mod test { conn_opts, 1, 600, - |client_id, ref mut peer_server| { + |client_id, ref mut chainstate| { let peer_server_block = make_codec_test_block(25); let peer_server_consensus_hash = ConsensusHash([(client_id + 1) as u8; 20]); let index_block_hash = StacksBlockHeader::make_index_block_hash( @@ -1315,7 +1357,7 @@ mod test { test_debug!("Store peer server index block {:?}", &index_block_hash); store_staging_block( - peer_server.chainstate(), + chainstate, &peer_server_consensus_hash, &peer_server_block, &ConsensusHash([client_id as u8; 20]), diff --git a/src/vm/database/marf.rs b/src/vm/database/marf.rs index b03b3d0eed..0677671e2c 100644 --- a/src/vm/database/marf.rs +++ b/src/vm/database/marf.rs @@ -328,9 +328,13 @@ impl MarfedKV { // _if_ for some reason, we do want to be able to access that mined chain state in the future, // we should probably commit the data to a different table which does not have uniqueness constraints. self.side_store.rollback(&self.chain_tip); - self.marf - .commit_mined(will_move_to) - .expect("ERROR: Failed to commit MARF block"); + let _ = self.marf.commit_mined(will_move_to).map_err(|e| { + error!( + "Failed to commit to mined MARF block {}: {:?}", + &will_move_to, &e + ); + panic!(); + }); } pub fn commit_to(&mut self, final_bhh: &StacksBlockId) { @@ -338,9 +342,10 @@ impl MarfedKV { self.side_store .commit_metadata_to(&self.chain_tip, final_bhh); self.side_store.commit(&self.chain_tip); - self.marf - .commit_to(final_bhh) - .expect("ERROR: Failed to commit MARF block"); + let _ = self.marf.commit_to(final_bhh).map_err(|e| { + error!("Failed to commit to MARF block {}: {:?}", &final_bhh, &e); + panic!(); + }); } pub fn commit_unconfirmed(&mut self) { diff --git a/src/vm/database/sqlite.rs b/src/vm/database/sqlite.rs index 935d6e6706..c1c0f8c9b7 100644 --- a/src/vm/database/sqlite.rs +++ b/src/vm/database/sqlite.rs @@ -21,24 +21,36 @@ pub struct SqliteConnection { fn sqlite_put(conn: &Connection, key: &str, value: &str) { let params: [&dyn ToSql; 2] = [&key, &value]; - conn.execute( + match conn.execute( "REPLACE INTO data_table (key, value) VALUES (?, ?)", ¶ms, - ) - .expect(SQL_FAIL_MESSAGE); + ) { + Ok(_) => {} + Err(e) => { + error!("Failed to insert/replace ({},{}): {:?}", key, value, &e); + panic!(SQL_FAIL_MESSAGE); + } + }; } fn sqlite_get(conn: &Connection, key: &str) -> Option { trace!("sqlite_get {}", key); let params: [&dyn ToSql; 1] = [&key]; - let res = conn + let res = match conn .query_row( "SELECT value FROM data_table WHERE key = ?", ¶ms, |row| row.get(0), ) .optional() - .expect(SQL_FAIL_MESSAGE); + { + Ok(x) => x, + Err(e) => { + error!("Failed to query '{}': {:?}", key, &e); + panic!(SQL_FAIL_MESSAGE); + } + }; + trace!("sqlite_get {}: {:?}", key, &res); res } @@ -65,22 +77,36 @@ impl SqliteConnection { let key = format!("clr-meta::{}::{}", contract_hash, key); let params: [&dyn ToSql; 3] = [&bhh, &key, &value.to_string()]; - self.conn - .execute( - "INSERT INTO metadata_table (blockhash, key, value) VALUES (?, ?, ?)", - ¶ms, - ) - .expect(SQL_FAIL_MESSAGE); + match self.conn.execute( + "INSERT INTO metadata_table (blockhash, key, value) VALUES (?, ?, ?)", + ¶ms, + ) { + Ok(_) => {} + Err(e) => { + error!( + "Failed to insert ({},{},{}): {:?}", + &bhh, + &key, + &value.to_string(), + &e + ); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn commit_metadata_to(&mut self, from: &StacksBlockId, to: &StacksBlockId) { let params = [to, from]; - self.conn - .execute( - "UPDATE metadata_table SET blockhash = ? WHERE blockhash = ?", - ¶ms, - ) - .expect(SQL_FAIL_MESSAGE); + match self.conn.execute( + "UPDATE metadata_table SET blockhash = ? WHERE blockhash = ?", + ¶ms, + ) { + Ok(_) => {} + Err(e) => { + error!("Failed to update {} to {}: {:?}", &from, &to, &e); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn get_metadata( @@ -92,14 +118,21 @@ impl SqliteConnection { let key = format!("clr-meta::{}::{}", contract_hash, key); let params: [&dyn ToSql; 2] = [&bhh, &key]; - self.conn + match self + .conn .query_row( "SELECT value FROM metadata_table WHERE blockhash = ? AND key = ?", ¶ms, |row| row.get(0), ) .optional() - .expect(SQL_FAIL_MESSAGE) + { + Ok(x) => x, + Err(e) => { + error!("Failed to query ({},{}): {:?}", &bhh, &key, &e); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn has_entry(&mut self, key: &str) -> bool { @@ -117,9 +150,16 @@ impl SqliteConnection { pub fn begin(&mut self, key: &StacksBlockId) { trace!("SAVEPOINT SP{}", key); - self.conn + match self + .conn .execute(&format!("SAVEPOINT SP{}", key), NO_PARAMS) - .expect(SQL_FAIL_MESSAGE); + { + Ok(_) => {} + Err(e) => { + error!("Failed to begin savepoint {}: {:?}", &key, &e); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn rollback(&mut self, key: &StacksBlockId) { @@ -128,19 +168,33 @@ impl SqliteConnection { key, key ); - self.conn - .execute_batch(&format!( - "ROLLBACK TO SAVEPOINT SP{}; RELEASE SAVEPOINT SP{}", - key, key - )) - .expect(SQL_FAIL_MESSAGE); + match self.conn.execute_batch(&format!( + "ROLLBACK TO SAVEPOINT SP{}; RELEASE SAVEPOINT SP{}", + key, key + )) { + Ok(_) => {} + Err(e) => { + error!( + "Failed to rollback and release savepoint {}: {:?}", + &key, &e + ); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn delete_unconfirmed(&mut self, key: &StacksBlockId) { trace!("DELETE FROM metadata_table WHERE block_hash = {}", key); - self.conn + match self + .conn .execute("DELETE FROM metadata_table WHERE blockhash = ?", &[key]) - .expect(SQL_FAIL_MESSAGE); + { + Ok(_) => {} + Err(e) => { + error!("Failed to delete from metadata_table {}: {:?}", &key, &e); + panic!(SQL_FAIL_MESSAGE); + } + } } pub fn rollback_unconfirmed(&mut self, key: &StacksBlockId) { @@ -149,20 +203,35 @@ impl SqliteConnection { key, key ); - self.conn - .execute_batch(&format!( - "ROLLBACK TO SAVEPOINT SP{}; RELEASE SAVEPOINT SP{}", - key, key - )) - .expect(SQL_FAIL_MESSAGE); + match self.conn.execute_batch(&format!( + "ROLLBACK TO SAVEPOINT SP{}; RELEASE SAVEPOINT SP{}", + key, key + )) { + Ok(_) => {} + Err(e) => { + error!( + "Failed to rollback and release unconfirmed savepoint {}: {:?}", + &key, &e + ); + panic!(SQL_FAIL_MESSAGE); + } + } + self.delete_unconfirmed(key); } pub fn commit(&mut self, key: &StacksBlockId) { trace!("RELEASE SAVEPOINT SP{}", key); - self.conn + match self + .conn .execute(&format!("RELEASE SAVEPOINT SP{}", key), NO_PARAMS) - .expect("PANIC: Failed to SQL commit in Smart Contract VM."); + { + Ok(_) => {} + Err(e) => { + error!("Failed to release savepoint {}: {:?}", &key, &e); + panic!("PANIC: Failed to SQL commit in Smart Contract VM."); + } + } } } diff --git a/src/vm/functions/assets.rs b/src/vm/functions/assets.rs index fcf8040963..9aafc777f9 100644 --- a/src/vm/functions/assets.rs +++ b/src/vm/functions/assets.rs @@ -55,7 +55,7 @@ pub fn get_stx_balance_snapshot( ) -> (STXBalance, u64) { let stx_balance = db.get_account_stx_balance(principal); let cur_burn_height = db.get_current_burnchain_block_height() as u64; - test_debug!("Balance of {} (raw={},locked={},unlock-height={},current-height={}) is {} (has_locked_tokens_unlockable={})", + test_debug!("Balance of {} (raw={},locked={},unlock-height={},current-height={}) is {} (has_locked_tokens_unlockable={})", principal, stx_balance.amount_unlocked, stx_balance.amount_locked, diff --git a/testnet/stacks-node/src/burnchains/bitcoin_regtest_controller.rs b/testnet/stacks-node/src/burnchains/bitcoin_regtest_controller.rs index 5968d1ac74..d704af243d 100644 --- a/testnet/stacks-node/src/burnchains/bitcoin_regtest_controller.rs +++ b/testnet/stacks-node/src/burnchains/bitcoin_regtest_controller.rs @@ -21,9 +21,11 @@ use stacks::burnchains::bitcoin::indexer::{ use stacks::burnchains::bitcoin::spv::SpvClient; use stacks::burnchains::bitcoin::BitcoinNetworkType; use stacks::burnchains::db::BurnchainDB; +use stacks::burnchains::indexer::BurnchainIndexer; use stacks::burnchains::Burnchain; use stacks::burnchains::BurnchainStateTransitionOps; use stacks::burnchains::Error as burnchain_error; +use stacks::burnchains::PoxConstants; use stacks::burnchains::PublicKey; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::operations::{ @@ -140,15 +142,24 @@ impl BitcoinRegtestController { } } - fn setup_indexer_runtime(&mut self) -> (Burnchain, BitcoinIndexer) { - let (burnchain, network_type) = self.setup_burnchain(); + pub fn get_pox_constants(&self) -> PoxConstants { + let (burnchain, _) = self.setup_burnchain(); + burnchain.pox_constants.clone() + } + + pub fn get_burnchain(&self) -> Burnchain { + let (burnchain, _) = self.setup_burnchain(); + burnchain + } + fn setup_indexer_runtime(&mut self) -> (Burnchain, BitcoinIndexer) { + let (_, network_type) = self.config.burnchain.get_bitcoin_network(); let indexer_runtime = BitcoinIndexerRuntime::new(network_type); let burnchain_indexer = BitcoinIndexer { config: self.indexer_config.clone(), runtime: indexer_runtime, }; - (burnchain, burnchain_indexer) + (self.get_burnchain(), burnchain_indexer) } fn receive_blocks_helium(&mut self) -> BurnchainTip { @@ -210,31 +221,57 @@ impl BitcoinRegtestController { rest } - fn receive_blocks(&mut self, sync: bool) -> Result { + fn receive_blocks( + &mut self, + block_for_sortitions: bool, + target_block_height_opt: Option, + ) -> Result<(BurnchainTip, u64), BurnchainControllerError> { let coordinator_comms = match self.use_coordinator.as_ref() { Some(x) => x.clone(), - None => return Ok(self.receive_blocks_helium()), + None => { + // pre-PoX helium node + let tip = self.receive_blocks_helium(); + let height = tip.block_snapshot.block_height; + return Ok((tip, height)); + } }; let (mut burnchain, mut burnchain_indexer) = self.setup_indexer_runtime(); - let (block_snapshot, state_transition) = loop { - match burnchain.sync_with_indexer(&mut burnchain_indexer, coordinator_comms.clone()) { + let (block_snapshot, burnchain_height, state_transition) = loop { + match burnchain.sync_with_indexer( + &mut burnchain_indexer, + coordinator_comms.clone(), + target_block_height_opt, + Some(burnchain.pox_constants.reward_cycle_length as u64), + ) { Ok(x) => { increment_btc_blocks_received_counter(); + // initialize the dbs... self.sortdb_mut(); - if sync { + + // wait for the chains coordinator to catch up with us + if block_for_sortitions { self.wait_for_sortitions(Some(x.block_height)); } + + // NOTE: This is the latest _sortition_ on the canonical sortition history, not the latest burnchain block! let sort_tip = - SortitionDB::get_canonical_sortition_tip(self.sortdb_ref().conn()) + SortitionDB::get_canonical_burn_chain_tip(self.sortdb_ref().conn()) .expect("Sortition DB error."); - let x = self + + let (snapshot, state_transition) = self .sortdb_ref() - .get_sortition_result(&sort_tip) + .get_sortition_result(&sort_tip.sortition_id) .expect("Sortition DB error.") .expect("BUG: no data for the canonical chain tip"); - break x; + + let burnchain_height = burnchain_indexer + .get_headers_height() + .map_err(BurnchainControllerError::IndexerError)? + - 1; // 1-indexed, so convert to 0-indexed height + + break (snapshot, burnchain_height, state_transition); } Err(e) => { // keep trying @@ -272,7 +309,7 @@ impl BitcoinRegtestController { self.chain_tip = Some(burnchain_tip.clone()); debug!("Done receiving blocks"); - Ok(burnchain_tip) + Ok((burnchain_tip, burnchain_height)) } pub fn get_utxos( @@ -637,6 +674,9 @@ impl BitcoinRegtestController { }; } } + + // yield some time + sleep_ms(100); } } @@ -703,25 +743,28 @@ impl BurnchainController for BitcoinRegtestController { } } - fn start(&mut self) -> Result { - self.receive_blocks(false) + fn start( + &mut self, + target_block_height_opt: Option, + ) -> Result<(BurnchainTip, u64), BurnchainControllerError> { + // if no target block height is given, just fetch the first burnchain block. + self.receive_blocks( + false, + target_block_height_opt.map_or_else(|| Some(1), |x| Some(x)), + ) } - fn sync(&mut self) -> Result { - let burnchain_tip = if self.config.burnchain.mode == "helium" { + fn sync( + &mut self, + target_block_height_opt: Option, + ) -> Result<(BurnchainTip, u64), BurnchainControllerError> { + let (burnchain_tip, burnchain_height) = if self.config.burnchain.mode == "helium" { // Helium: this node is responsible for mining new burnchain blocks self.build_next_block(1); - self.receive_blocks(true)? + self.receive_blocks(true, None)? } else { // Neon: this node is waiting on a block to be produced - let current_height = self.get_chain_tip().block_snapshot.block_height; - loop { - let burnchain_tip = self.receive_blocks(true)?; - if burnchain_tip.block_snapshot.block_height > current_height { - break burnchain_tip; - } - sleep_ms(5000); - } + self.receive_blocks(true, target_block_height_opt)? }; // Evaluate process_exit_at_block_height setting @@ -736,7 +779,7 @@ impl BurnchainController for BitcoinRegtestController { std::process::exit(0); } } - Ok(burnchain_tip) + Ok((burnchain_tip, burnchain_height)) } // returns true if the operation was submitted successfully, false otherwise diff --git a/testnet/stacks-node/src/burnchains/mocknet_controller.rs b/testnet/stacks-node/src/burnchains/mocknet_controller.rs index a5081edd69..8993302ce7 100644 --- a/testnet/stacks-node/src/burnchains/mocknet_controller.rs +++ b/testnet/stacks-node/src/burnchains/mocknet_controller.rs @@ -88,7 +88,10 @@ impl BurnchainController for MocknetController { } } - fn start(&mut self) -> Result { + fn start( + &mut self, + _ignored_target_height_opt: Option, + ) -> Result<(BurnchainTip, u64), BurnchainControllerError> { let db = match SortitionDB::connect( &self.config.get_burn_db_file_path(), 0, @@ -110,8 +113,8 @@ impl BurnchainController for MocknetController { received_at: Instant::now(), }; self.chain_tip = Some(genesis_state.clone()); - - Ok(genesis_state) + let block_height = genesis_state.block_snapshot.block_height; + Ok((genesis_state, block_height)) } fn submit_operation( @@ -123,7 +126,10 @@ impl BurnchainController for MocknetController { true } - fn sync(&mut self) -> Result { + fn sync( + &mut self, + _ignored_target_height_opt: Option, + ) -> Result<(BurnchainTip, u64), BurnchainControllerError> { let chain_tip = self.get_chain_tip(); // Simulating mining @@ -229,7 +235,8 @@ impl BurnchainController for MocknetController { }; self.chain_tip = Some(new_state.clone()); - Ok(new_state) + let block_height = new_state.block_snapshot.block_height; + Ok((new_state, block_height)) } #[cfg(test)] diff --git a/testnet/stacks-node/src/burnchains/mod.rs b/testnet/stacks-node/src/burnchains/mod.rs index ed38c3f3a3..703dc155de 100644 --- a/testnet/stacks-node/src/burnchains/mod.rs +++ b/testnet/stacks-node/src/burnchains/mod.rs @@ -9,6 +9,7 @@ use super::operations::BurnchainOpSigner; use std::fmt; use std::time::Instant; +use stacks::burnchains; use stacks::burnchains::BurnchainStateTransitionOps; use stacks::chainstate::burn::db::sortdb::SortitionDB; use stacks::chainstate::burn::operations::BlockstackOperationType; @@ -17,24 +18,27 @@ use stacks::chainstate::burn::BlockSnapshot; #[derive(Debug)] pub enum Error { CoordinatorClosed, + IndexerError(burnchains::Error), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::CoordinatorClosed => write!(f, "ChainsCoordinator closed"), + Error::IndexerError(ref e) => write!(f, "Indexer error: {:?}", e), } } } pub trait BurnchainController { - fn start(&mut self) -> Result; + fn start(&mut self, target_block_height_opt: Option) + -> Result<(BurnchainTip, u64), Error>; fn submit_operation( &mut self, operation: BlockstackOperationType, op_signer: &mut BurnchainOpSigner, ) -> bool; - fn sync(&mut self) -> Result; + fn sync(&mut self, target_block_height_opt: Option) -> Result<(BurnchainTip, u64), Error>; fn sortdb_ref(&self) -> &SortitionDB; fn sortdb_mut(&mut self) -> &mut SortitionDB; fn get_chain_tip(&mut self) -> BurnchainTip; diff --git a/testnet/stacks-node/src/keychain.rs b/testnet/stacks-node/src/keychain.rs index ae7ff856da..606dd63af6 100644 --- a/testnet/stacks-node/src/keychain.rs +++ b/testnet/stacks-node/src/keychain.rs @@ -140,7 +140,10 @@ impl Keychain { // Retrieve the corresponding VRF secret key let vrf_sk = match self.vrf_map.get(vrf_pk) { Some(vrf_pk) => vrf_pk, - None => return None, + None => { + warn!("No VRF secret key on file for {:?}", vrf_pk); + return None; + } }; // Generate the proof diff --git a/testnet/stacks-node/src/main.rs b/testnet/stacks-node/src/main.rs index d7950165e0..62d04ceef6 100644 --- a/testnet/stacks-node/src/main.rs +++ b/testnet/stacks-node/src/main.rs @@ -23,6 +23,7 @@ pub mod neon_node; pub mod node; pub mod operations; pub mod run_loop; +pub mod syncctl; pub mod tenure; pub use self::burnchains::{ diff --git a/testnet/stacks-node/src/neon_node.rs b/testnet/stacks-node/src/neon_node.rs index 47d0b4e8a1..39691f9b39 100644 --- a/testnet/stacks-node/src/neon_node.rs +++ b/testnet/stacks-node/src/neon_node.rs @@ -122,6 +122,7 @@ fn inner_process_tenure( consensus_hash, &anchored_block, &parent_consensus_hash, + 0, )?; } @@ -834,12 +835,19 @@ impl InitializedNeonNode { bitcoin_controller: &mut BitcoinRegtestController, ) -> Option { // Generates a proof out of the sortition hash provided in the params. - let vrf_proof = keychain - .generate_proof( - ®istered_key.vrf_public_key, - burn_block.sortition_hash.as_bytes(), - ) - .unwrap(); + let vrf_proof = match keychain.generate_proof( + ®istered_key.vrf_public_key, + burn_block.sortition_hash.as_bytes(), + ) { + Some(vrfp) => vrfp, + None => { + error!( + "Failed to generate proof with {:?}", + ®istered_key.vrf_public_key + ); + return None; + } + }; debug!( "Generated VRF Proof: {} over {} with key {}", @@ -879,7 +887,7 @@ impl InitializedNeonNode { // the consensus hash of my Stacks block parent let parent_consensus_hash = stacks_tip.consensus_hash.clone(); - // the stacks block I'm mining off of's burn header hash and vtx index: + // the stacks block I'm mining off of's burn header hash and vtxindex: let parent_snapshot = SortitionDB::get_block_snapshot_consensus( burn_db.conn(), &stacks_tip.consensus_hash, @@ -1032,6 +1040,7 @@ impl InitializedNeonNode { &mut self, sortdb: &SortitionDB, sort_id: &SortitionId, + ibd: bool, ) -> (Option, bool) { let mut last_sortitioned_block = None; let mut won_sortition = false; @@ -1052,9 +1061,10 @@ impl InitializedNeonNode { for op in block_commits.into_iter() { if op.txid == block_snapshot.winning_block_txid { info!( - "Received burnchain block #{} including block_commit_op (winning) - {}", + "Received burnchain block #{} including block_commit_op (winning) - {} ({})", block_height, - op.input.to_testnet_address() + op.input.to_testnet_address(), + &op.block_header_hash ); last_sortitioned_block = Some((block_snapshot.clone(), op.vtxindex)); // Release current registered key if leader won the sortition @@ -1065,9 +1075,10 @@ impl InitializedNeonNode { } else { if self.is_miner { info!( - "Received burnchain block #{} including block_commit_op - {}", + "Received burnchain block #{} including block_commit_op - {} ({})", block_height, - op.input.to_testnet_address() + op.input.to_testnet_address(), + &op.block_header_hash ); } } @@ -1076,6 +1087,7 @@ impl InitializedNeonNode { let key_registers = SortitionDB::get_leader_keys_by_block(&ic, &block_snapshot.sortition_id) .expect("Unexpected SortitionDB error fetching key registers"); + for op in key_registers.into_iter() { if self.is_miner { info!( @@ -1084,12 +1096,15 @@ impl InitializedNeonNode { ); } if op.address == Keychain::address_from_burnchain_signer(&self.burnchain_signer) { - // Registered key has been mined - self.active_keys.push(RegisteredKey { - vrf_public_key: op.public_key, - block_height: op.block_height as u64, - op_vtxindex: op.vtxindex as u32, - }); + if !ibd { + // not in initial block download, so we're not just replaying an old key. + // Registered key has been mined + self.active_keys.push(RegisteredKey { + vrf_public_key: op.public_key, + block_height: op.block_height as u64, + op_vtxindex: op.vtxindex as u32, + }); + } } } diff --git a/testnet/stacks-node/src/node.rs b/testnet/stacks-node/src/node.rs index cf64548bc7..8a00ca667e 100644 --- a/testnet/stacks-node/src/node.rs +++ b/testnet/stacks-node/src/node.rs @@ -559,6 +559,7 @@ impl Node { consensus_hash, &anchored_block, &parent_consensus_hash, + 0, ) .unwrap(); diff --git a/testnet/stacks-node/src/run_loop/helium.rs b/testnet/stacks-node/src/run_loop/helium.rs index 322d6c5cec..de170b70c1 100644 --- a/testnet/stacks-node/src/run_loop/helium.rs +++ b/testnet/stacks-node/src/run_loop/helium.rs @@ -50,7 +50,7 @@ impl RunLoop { self.callbacks.invoke_burn_chain_initialized(&mut burnchain); - let initial_state = burnchain.start()?; + let (initial_state, _) = burnchain.start(None)?; // Update each node with the genesis block. self.node.process_burnchain_state(&initial_state); @@ -63,7 +63,7 @@ impl RunLoop { let mut round_index: u64 = 0; // Sync and update node with this new block. - let burnchain_tip = burnchain.sync()?; + let (burnchain_tip, _) = burnchain.sync(None)?; self.node.process_burnchain_state(&burnchain_tip); // todo(ludo): should return genesis? let mut chain_tip = ChainTip::genesis(self.config.get_initial_liquid_ustx()); @@ -105,7 +105,8 @@ impl RunLoop { artifacts_from_1st_tenure.burn_fee, ); - let mut burnchain_tip = burnchain.sync()?; + let (mut burnchain_tip, _) = burnchain.sync(None)?; + self.callbacks .invoke_new_burn_chain_state(round_index, &burnchain_tip, &chain_tip); @@ -174,7 +175,9 @@ impl RunLoop { None => {} } - burnchain_tip = burnchain.sync()?; + let (new_burnchain_tip, _) = burnchain.sync(None)?; + burnchain_tip = new_burnchain_tip; + self.callbacks .invoke_new_burn_chain_state(round_index, &burnchain_tip, &chain_tip); diff --git a/testnet/stacks-node/src/run_loop/neon.rs b/testnet/stacks-node/src/run_loop/neon.rs index e69798bc96..b471beeab2 100644 --- a/testnet/stacks-node/src/run_loop/neon.rs +++ b/testnet/stacks-node/src/run_loop/neon.rs @@ -15,6 +15,8 @@ use super::RunLoopCallbacks; use crate::monitoring::start_serving_monitoring_metrics; +use crate::syncctl::PoxSyncWatchdog; + /// Coordinating a node running in neon mode. #[cfg(test)] pub struct RunLoop { @@ -90,6 +92,7 @@ impl RunLoop { // Initialize and start the burnchain. let mut burnchain = BitcoinRegtestController::new(self.config.clone(), Some(coordinator_senders.clone())); + let pox_constants = burnchain.get_pox_constants(); let is_miner = if self.config.node.miner { let keychain = Keychain::default(self.config.node.seed.clone()); @@ -115,8 +118,9 @@ impl RunLoop { false }; - let _burnchain_tip = match burnchain.start() { - Ok(x) => x, + let mut target_burnchain_block_height = 1; + match burnchain.start(Some(target_burnchain_block_height)) { + Ok(_) => {} Err(e) => { warn!("Burnchain controller stopped: {}", e); return; @@ -132,6 +136,7 @@ impl RunLoop { .iter() .map(|e| (e.address.clone(), e.amount)) .collect(); + let burnchain_poll_time = 30; // TODO: this is testnet-specific // setup dispatcher let mut event_dispatcher = EventDispatcher::new(); @@ -152,11 +157,12 @@ impl RunLoop { } }; let chainstate_path = self.config.get_chainstate_path(); + let coordinator_burnchain_config = burnchain_config.clone(); thread::spawn(move || { ChainsCoordinator::run( &chainstate_path, - burnchain_config, + coordinator_burnchain_config, mainnet, chainid, Some(initial_balances), @@ -201,14 +207,37 @@ impl RunLoop { }); } + let chainstate_path = self.config.get_chainstate_path(); + let mut pox_watchdog = PoxSyncWatchdog::new( + mainnet, + chainid, + chainstate_path, + burnchain_poll_time, + self.config.connection_options.timeout, + ) + .unwrap(); + let mut burnchain_height = 1; + + // prepare to fetch the first reward cycle! + target_burnchain_block_height = pox_constants.reward_cycle_length as u64; + loop { - burnchain_tip = match burnchain.sync() { - Ok(x) => x, - Err(e) => { - warn!("Burnchain controller stopped: {}", e); - return; - } - }; + // wait until it's okay to process the next sortitions + let ibd = + pox_watchdog.pox_sync_wait(&burnchain_config, &burnchain_tip, burnchain_height); + + let (next_burnchain_tip, next_burnchain_height) = + match burnchain.sync(Some(target_burnchain_block_height)) { + Ok(x) => x, + Err(e) => { + warn!("Burnchain controller stopped: {}", e); + return; + } + }; + + target_burnchain_block_height += pox_constants.reward_cycle_length as u64; + burnchain_tip = next_burnchain_tip; + burnchain_height = next_burnchain_height; let sortition_tip = &burnchain_tip.block_snapshot.sortition_id; let next_height = burnchain_tip.block_snapshot.block_height; @@ -228,7 +257,8 @@ impl RunLoop { let sortition_id = &block.sortition_id; // Have the node process the new block, that can include, or not, a sortition. - node.process_burnchain_state(burnchain.sortdb_mut(), sortition_id); + node.process_burnchain_state(burnchain.sortdb_mut(), sortition_id, ibd); + // Now, tell the relayer to check if it won a sortition during this block, // and, if so, to process and advertize the block // @@ -239,14 +269,22 @@ impl RunLoop { return; } } - // now, let's tell the miner to try and mine. - if !node.relayer_issue_tenure() { - // relayer hung up, exit. - error!("Block relayer and miner hung up, exiting."); - return; - } block_height = next_height; + debug!( + "Synchronized up to block height {} (chain tip height is {})", + block_height, burnchain_height + ); + + if block_height >= burnchain_height { + // at tip. proceed to mine. + debug!("Synchronized full burnchain. Proceeding to mine blocks"); + if !node.relayer_issue_tenure() { + // relayer hung up, exit. + error!("Block relayer and miner hung up, exiting."); + return; + } + } } } } diff --git a/testnet/stacks-node/src/syncctl.rs b/testnet/stacks-node/src/syncctl.rs new file mode 100644 index 0000000000..b46093b355 --- /dev/null +++ b/testnet/stacks-node/src/syncctl.rs @@ -0,0 +1,449 @@ +use std::collections::VecDeque; + +use stacks::burnchains::Burnchain; +use stacks::chainstate::stacks::db::StacksChainState; +use stacks::util::get_epoch_time_secs; +use stacks::util::sleep_ms; + +use crate::burnchains::BurnchainTip; + +/// Monitor the state of the Stacks blockchain as the peer network and relay threads download and +/// proces Stacks blocks. Don't allow the node to process the next PoX reward cycle's sortitions +/// unless it's reasonably sure that it has processed all Stacks blocks for this reward cycle. +/// This struct monitors the Stacks chainstate to make this determination. +pub struct PoxSyncWatchdog { + /// number of attachable but unprocessed staging blocks over time + new_attachable_blocks: VecDeque, + /// number of newly-processed staging blocks over time + new_processed_blocks: VecDeque, + /// last time we asked for attachable blocks + last_attachable_query: u64, + /// last time we asked for processed blocks + last_processed_query: u64, + /// number of samples to take + max_samples: u64, + /// maximum number of blocks to count per query (affects performance!) + max_staging: u64, + /// when did we first start watching? + watch_start_ts: u64, + /// when did we first see a flatline in block-processing rate? + last_block_processed_ts: u64, + /// estimated time for a block to get downloaded. Used to infer how long to wait for the first + /// blocks to show up when waiting for this reward cycle. + estimated_block_download_time: f64, + /// estimated time for a block to get processed -- from when it shows up as attachable to when + /// it shows up as processed. Used to infer how long to wait for the last block to get + /// processed before unblocking burnchain sync for the next reward cycle. + estimated_block_process_time: f64, + /// time between burnchain syncs in stead state + steady_state_burnchain_sync_interval: u64, + /// when to re-sync under steady state + steady_state_resync_ts: u64, + /// chainstate handle + chainstate: StacksChainState, +} + +impl PoxSyncWatchdog { + pub fn new( + mainnet: bool, + chain_id: u32, + chainstate_path: String, + burnchain_poll_time: u64, + download_timeout: u64, + ) -> Result { + let (chainstate, _) = match StacksChainState::open(mainnet, chain_id, &chainstate_path) { + Ok(cs) => cs, + Err(e) => { + return Err(format!( + "Failed to open chainstate at '{}': {:?}", + &chainstate_path, &e + )); + } + }; + + Ok(PoxSyncWatchdog { + new_attachable_blocks: VecDeque::new(), + new_processed_blocks: VecDeque::new(), + last_attachable_query: 0, + last_processed_query: 0, + max_samples: download_timeout, // sample once per second for however long we expect a timeout to be + max_staging: 10, + watch_start_ts: 0, + last_block_processed_ts: 0, + estimated_block_download_time: download_timeout as f64, + estimated_block_process_time: 5.0, + steady_state_burnchain_sync_interval: burnchain_poll_time, + steady_state_resync_ts: 0, + chainstate: chainstate, + }) + } + + /// How many recently-added Stacks blocks are in an attachable state, up to $max_staging? + fn count_attachable_stacks_blocks(&mut self) -> Result { + // number of staging blocks that have arrived since the last sortition + let cnt = StacksChainState::count_attachable_staging_blocks( + &self.chainstate.blocks_db, + self.max_staging, + self.last_attachable_query, + ) + .map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?; + + self.last_attachable_query = get_epoch_time_secs(); + Ok(cnt) + } + + /// How many recently-processed Stacks blocks are there, up to $max_staging? + /// ($max_staging is necessary to limit the runtime of this method, since the underlying SQL + /// uses COUNT(*), which in Sqlite is a _O(n)_ operation for _n_ rows) + fn count_processed_stacks_blocks(&mut self) -> Result { + // number of staging blocks that have arrived since the last sortition + let cnt = StacksChainState::count_processed_staging_blocks( + &self.chainstate.blocks_db, + self.max_staging, + self.last_processed_query, + ) + .map_err(|e| format!("Failed to count attachable staging blocks: {:?}", &e))?; + + self.last_processed_query = get_epoch_time_secs(); + Ok(cnt) + } + + /// Are we in the initial block download? i.e. is the burn tip snapshot far enough away + /// from the burnchain height that we should be eagerly downloading snapshots? + pub fn infer_initial_block_download( + burnchain: &Burnchain, + burnchain_tip: &BurnchainTip, + burnchain_height: u64, + ) -> bool { + burnchain_tip.block_snapshot.block_height + (burnchain.stable_confirmations as u64) + < burnchain_height + } + + /// Calculate the first derivative of a list of points + fn derivative(sample_list: &VecDeque) -> Vec { + let mut deltas = vec![]; + let mut prev = 0; + for (i, sample) in sample_list.iter().enumerate() { + if i == 0 { + prev = *sample; + continue; + } + let delta = *sample - prev; + prev = *sample; + deltas.push(delta); + } + deltas + } + + /// Is a derivative approximately flat, with a maximum absolute deviation from 0? + /// Return whether or not the sample is mostly flat, and how many points were over the given + /// error bar in either direction. + fn is_mostly_flat(deriv: &Vec, error: i64) -> (bool, usize) { + let mut total_deviates = 0; + let mut ret = true; + for d in deriv.iter() { + if d.abs() > error { + total_deviates += 1; + ret = false; + } + } + (ret, total_deviates) + } + + /// low and high pass filter average -- take average without the smallest and largest values + fn hilo_filter_avg(samples: &Vec) -> f64 { + // take average with low and high pass + let mut min = i64::max_value(); + let mut max = i64::min_value(); + for s in samples.iter() { + if *s < 0 { + // nonsensical result (e.g. due to clock drift?) + continue; + } + if *s < min { + min = *s; + } + if *s > max { + max = *s; + } + } + + let mut count = 0; + let mut sum = 0; + for s in samples.iter() { + if *s < 0 { + // nonsensical result + continue; + } + if *s == min { + continue; + } + if *s == max { + continue; + } + count += 1; + sum += *s; + } + + if count == 0 { + // no viable samples + 1.0 + } else { + (sum as f64) / (count as f64) + } + } + + /// estimate how long a block remains in an unprocessed state + fn estimate_block_process_time( + chainstate: &StacksChainState, + burnchain: &Burnchain, + tip_height: u64, + ) -> f64 { + let this_reward_cycle = burnchain + .block_height_to_reward_cycle(tip_height) + .expect(&format!("BUG: no reward cycle for {}", tip_height)); + let prev_reward_cycle = this_reward_cycle.saturating_sub(1); + + let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle); + let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle); + + if this_reward_cycle > 0 { + assert!(start_height < end_height); + } else { + // no samples yet + return 1.0; + } + + let block_wait_times = StacksChainState::measure_block_wait_time( + &chainstate.blocks_db, + start_height, + end_height, + ) + .expect("BUG: failed to query chainstate block-processing times"); + + PoxSyncWatchdog::hilo_filter_avg(&block_wait_times) + } + + /// estimate how long a block takes to download + fn estimate_block_download_time( + chainstate: &StacksChainState, + burnchain: &Burnchain, + tip_height: u64, + ) -> f64 { + let this_reward_cycle = burnchain + .block_height_to_reward_cycle(tip_height) + .expect(&format!("BUG: no reward cycle for {}", tip_height)); + let prev_reward_cycle = this_reward_cycle.saturating_sub(1); + + let start_height = burnchain.reward_cycle_to_block_height(prev_reward_cycle); + let end_height = burnchain.reward_cycle_to_block_height(this_reward_cycle); + + if this_reward_cycle > 0 { + assert!(start_height < end_height); + } else { + // no samples yet + return 1.0; + } + + let block_download_times = StacksChainState::measure_block_download_time( + &chainstate.blocks_db, + start_height, + end_height, + ) + .expect("BUG: failed to query chainstate block-download times"); + + PoxSyncWatchdog::hilo_filter_avg(&block_download_times) + } + + /// Reset internal state. Performed when it's okay to begin syncing the burnchain. + /// Updates estimate for block-processing time and block-downloading time. + fn reset(&mut self, burnchain: &Burnchain, tip_height: u64) { + // find the average (with low/high pass filter) time a block spends in the DB without being + // processed, during this reward cycle + self.estimated_block_process_time = + PoxSyncWatchdog::estimate_block_process_time(&self.chainstate, burnchain, tip_height); + + // find the average (with low/high pass filter) time a block spends downloading + self.estimated_block_download_time = + PoxSyncWatchdog::estimate_block_download_time(&self.chainstate, burnchain, tip_height); + + debug!( + "Estimated block download time: {}s. Estimated block processing time: {}s", + self.estimated_block_download_time, self.estimated_block_process_time + ); + + self.new_attachable_blocks.clear(); + self.new_processed_blocks.clear(); + self.last_block_processed_ts = 0; + self.watch_start_ts = 0; + self.steady_state_resync_ts = 0; + } + + /// Wait until all of the Stacks blocks for the given reward cycle are seemingly downloaded and + /// processed. Do so by watching the _rate_ at which attachable Stacks blocks arrive and get + /// processed. + /// Returns whether or not we're still in the initial block download + pub fn pox_sync_wait( + &mut self, + burnchain: &Burnchain, + burnchain_tip: &BurnchainTip, + burnchain_height: u64, + ) -> bool { + if self.watch_start_ts == 0 { + self.watch_start_ts = get_epoch_time_secs(); + } + if self.steady_state_resync_ts == 0 { + self.steady_state_resync_ts = + get_epoch_time_secs() + self.steady_state_burnchain_sync_interval; + } + + // unconditionally download the first reward cycle + if burnchain_tip.block_snapshot.block_height + < burnchain.first_block_height + (burnchain.pox_constants.reward_cycle_length as u64) + { + debug!("PoX watchdog in first reward cycle -- sync immediately"); + return PoxSyncWatchdog::infer_initial_block_download( + burnchain, + burnchain_tip, + burnchain_height, + ); + } + + let mut steady_state = false; + + let ibd = loop { + let ibd = PoxSyncWatchdog::infer_initial_block_download( + burnchain, + burnchain_tip, + burnchain_height, + ); + + let expected_first_block_deadline = + self.watch_start_ts + (self.estimated_block_download_time as u64); + let expected_last_block_deadline = self.last_block_processed_ts + + (self.estimated_block_download_time as u64) + + (self.estimated_block_process_time as u64); + + match ( + self.count_attachable_stacks_blocks(), + self.count_processed_stacks_blocks(), + ) { + (Ok(num_available), Ok(num_processed)) => { + self.new_attachable_blocks.push_back(num_available as i64); + self.new_processed_blocks.push_back(num_processed as i64); + + if (self.new_attachable_blocks.len() as u64) > self.max_samples { + self.new_attachable_blocks.pop_front(); + } + if (self.new_processed_blocks.len() as u64) > self.max_samples { + self.new_processed_blocks.pop_front(); + } + + if (self.new_attachable_blocks.len() as u64) < self.max_samples + || (self.new_processed_blocks.len() as u64) < self.max_samples + { + // still getting initial samples + if self.new_processed_blocks.len() % 10 == 0 { + debug!( + "PoX watchdog: Still warming up: {} out of {} samples...", + &self.new_attachable_blocks.len(), + &self.max_samples + ); + } + sleep_ms(1000); + continue; + } + + if self.watch_start_ts > 0 + && get_epoch_time_secs() < expected_first_block_deadline + { + // still waiting for that first block in this reward cycle + debug!("PoX watchdog: Still warming up: waiting until {}s for first Stacks block download (estimated download time: {}s)...", expected_first_block_deadline, self.estimated_block_download_time); + sleep_ms(1000); + continue; + } + + if self.watch_start_ts > 0 + && (self.new_attachable_blocks.len() as u64) < self.max_samples + && self.watch_start_ts + + self.max_samples + + self.steady_state_burnchain_sync_interval + * (burnchain.stable_confirmations as u64) + < get_epoch_time_secs() + { + debug!( + "PoX watchdog: could not calculate {} samples in {} seconds. Assuming suspend/resume, or assuming load is too high.", + self.max_samples, + self.max_samples + self.steady_state_burnchain_sync_interval * (burnchain.stable_confirmations as u64) + ); + self.reset(burnchain, burnchain_tip.block_snapshot.block_height); + + self.watch_start_ts = get_epoch_time_secs(); + self.steady_state_resync_ts = + get_epoch_time_secs() + self.steady_state_burnchain_sync_interval; + continue; + } + + // take first derivative of samples -- see if the download and processing rate has gone to 0 + let attachable_delta = PoxSyncWatchdog::derivative(&self.new_attachable_blocks); + let processed_delta = PoxSyncWatchdog::derivative(&self.new_processed_blocks); + + let (flat_attachable, attachable_deviants) = + PoxSyncWatchdog::is_mostly_flat(&attachable_delta, 0); + let (flat_processed, processed_deviants) = + PoxSyncWatchdog::is_mostly_flat(&processed_delta, 0); + + debug!("PoX watchdog: flat-attachable?: {}, flat-processed?: {}, estimated block-download time: {}s, estimated block-processing time: {}s", + flat_attachable, flat_processed, self.estimated_block_download_time, self.estimated_block_process_time); + + if flat_attachable && flat_processed && self.last_block_processed_ts == 0 { + // we're flat-lining -- this may be the end of this cycle + self.last_block_processed_ts = get_epoch_time_secs(); + } + + if self.last_block_processed_ts > 0 + && get_epoch_time_secs() < expected_last_block_deadline + { + debug!("PoX watchdog: Still processing blocks; waiting until at least min({},{})s before burnchain synchronization (estimated block-processing time: {}s)", + get_epoch_time_secs() + 1, expected_last_block_deadline, self.estimated_block_process_time); + sleep_ms(1000); + continue; + } + + if ibd { + // doing initial block download right now. + // only proceed to fetch the next reward cycle's burnchain blocks if we're neither downloading nor + // attaching blocks recently + debug!("PoX watchdog: In initial block download: flat-attachable = {}, flat-processed = {}, min-attachable: {}, min-processed: {}", + flat_attachable, flat_processed, &attachable_deviants, &processed_deviants); + + if !flat_attachable || !flat_processed { + sleep_ms(1000); + continue; + } + } else { + let now = get_epoch_time_secs(); + if now < self.steady_state_resync_ts { + // steady state + if !steady_state { + debug!("PoX watchdog: In steady-state; waiting until at least {} before burnchain synchronization", self.steady_state_resync_ts); + steady_state = true; + } + sleep_ms(1000); + continue; + } + } + } + (err_attach, err_processed) => { + // can only happen on DB query failure + error!("PoX watchdog: Failed to count recently attached ('{:?}') and/or processed ('{:?}') staging blocks", &err_attach, &err_processed); + panic!(); + } + }; + + self.reset(burnchain, burnchain_tip.block_snapshot.block_height); + break ibd; + }; + ibd + } +}