Skip to content

Commit

Permalink
fix: node gets banned on reorg (#4949)
Browse files Browse the repository at this point in the history
Description
---
Remove the `FetchBlocksByHash` handler. It was only called from a single place and although designed to handle multiple blocks it was only ever sending a single hash at once making the multi-block functionality useless. 
Instead, opt to use the existing `GetBlockByHash` handler and expand that handler to accept a new `orphans` flag. Passing this flag means we'll accept found blocks from the orphan pool,

Motivation and Context
---
Previously if a node had re-orged after a sync had started it may result in not providing the complete block for a block it claimed it had. This results in a brief ban.

Make it also return blocks from the orphan pool and let the peer figure out what to do with it. 

How Has This Been Tested?
---
Tests, and running nodes.

Fixes: #4799
  • Loading branch information
brianp authored Nov 28, 2022
1 parent 2de1c4c commit 5bcf6e5
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 99 deletions.
36 changes: 8 additions & 28 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,20 @@ pub enum NodeCommsRequest {
FetchHeaders(RangeInclusive<u64>),
FetchHeadersByHashes(Vec<HashOutput>),
FetchMatchingUtxos(Vec<HashOutput>),
FetchMatchingBlocks {
range: RangeInclusive<u64>,
compact: bool,
},
FetchBlocksByHash {
block_hashes: Vec<HashOutput>,
compact: bool,
},
FetchMatchingBlocks { range: RangeInclusive<u64>, compact: bool },
FetchBlocksByKernelExcessSigs(Vec<Signature>),
FetchBlocksByUtxos(Vec<Commitment>),
GetHeaderByHash(HashOutput),
GetBlockByHash(HashOutput),
GetNewBlockTemplate(GetNewBlockTemplateRequest),
GetNewBlock(NewBlockTemplate),
GetBlockFromAllChains(HashOutput),
FetchKernelByExcessSig(Signature),
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
FetchValidatorNodesKeys {
height: u64,
},
GetShardKey {
height: u64,
public_key: PublicKey,
},
FetchTemplateRegistrations {
start_height: u64,
end_height: u64,
},
FetchUnspentUtxosInBlock {
block_hash: BlockHash,
},
FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec<PrivateKey> },
FetchValidatorNodesKeys { height: u64 },
GetShardKey { height: u64, public_key: PublicKey },
FetchTemplateRegistrations { start_height: u64, end_height: u64 },
FetchUnspentUtxosInBlock { block_hash: BlockHash },
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -100,15 +82,13 @@ impl Display for NodeCommsRequest {
FetchMatchingBlocks { range, compact } => {
write!(f, "FetchMatchingBlocks ({:?}, {})", range, compact)
},
FetchBlocksByHash { block_hashes, compact } => {
write!(f, "FetchBlocksByHash (n={}, {})", block_hashes.len(), compact)
},
FetchBlocksByKernelExcessSigs(v) => write!(f, "FetchBlocksByKernelExcessSigs (n={})", v.len()),
FetchBlocksByUtxos(v) => write!(f, "FetchBlocksByUtxos (n={})", v.len()),
GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()),
GetBlockByHash(v) => write!(f, "GetBlockByHash({})", v.to_hex()),
GetNewBlockTemplate(v) => write!(f, "GetNewBlockTemplate ({}) with weight {}", v.algo, v.max_weight),
GetNewBlock(b) => write!(f, "GetNewBlock (Block Height={})", b.header.height),
GetBlockFromAllChains(v) => write!(f, "GetBlockFromAllChains({})", v.to_hex()),
FetchKernelByExcessSig(s) => write!(
f,
"FetchKernelByExcessSig (signature=({}, {}))",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum NodeCommsResponse {
TransactionKernels(Vec<TransactionKernel>),
BlockHeaders(Vec<ChainHeader>),
BlockHeader(Option<ChainHeader>),
Block(Box<Option<Block>>),
TransactionOutputs(Vec<TransactionOutput>),
HistoricalBlocks(Vec<HistoricalBlock>),
HistoricalBlock(Box<Option<HistoricalBlock>>),
Expand All @@ -70,6 +71,7 @@ impl Display for NodeCommsResponse {
TransactionKernels(_) => write!(f, "TransactionKernel"),
BlockHeaders(_) => write!(f, "BlockHeaders"),
BlockHeader(_) => write!(f, "BlockHeader"),
Block(_) => write!(f, "Block"),
HistoricalBlock(_) => write!(f, "HistoricalBlock"),
TransactionOutputs(_) => write!(f, "TransactionOutputs"),
HistoricalBlocks(_) => write!(f, "HistoricalBlocks"),
Expand Down
76 changes: 42 additions & 34 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,6 @@ where B: BlockchainBackend + 'static
let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByHash { block_hashes, compact } => {
let mut blocks = Vec::with_capacity(block_hashes.len());
for block_hash in block_hashes {
let block_hex = block_hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {} (compact = {})", block_hex, compact
);

match self.blockchain_db.fetch_block_by_hash(block_hash, compact).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because not stored", block_hex,
),
Err(e) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
block_hex,
e.to_string()
),
}
}
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
return Err(CommsInterfaceError::InvalidRequest {
Expand Down Expand Up @@ -342,6 +317,43 @@ where B: BlockchainBackend + 'static
block: Some(block),
})
},
NodeCommsRequest::GetBlockFromAllChains(hash) => {
let block_hex = hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", block_hex
);

let maybe_block = match self
.blockchain_db
.fetch_block_by_hash(hash, true)
.await
.unwrap_or_else(|e| {
warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
block_hex,
e.to_string()
);

None
}) {
None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
|e| {
warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}", block_hex, e,
);

None
},
Some,
),
Some(block) => Some(block.try_into_block()?),
};

Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
},
NodeCommsRequest::FetchKernelByExcessSig(signature) => {
let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
Ok(Some((kernel, _))) => vec![kernel],
Expand Down Expand Up @@ -596,16 +608,12 @@ where B: BlockchainBackend + 'static
source_peer: NodeId,
block_hash: BlockHash,
) -> Result<Block, CommsInterfaceError> {
let mut historical_block = self
return match self
.outbound_nci
.request_blocks_by_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

return match historical_block.pop() {
Some(block) => {
let block = block.try_into_block()?;
Ok(block)
},
.request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
.await?
{
Some(block) => Ok(block),
None => {
if let Err(e) = self
.connectivity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
NodeCommsRequest,
NodeCommsResponse,
},
blocks::{HistoricalBlock, NewBlock},
blocks::{Block, NewBlock},
};

/// The OutboundNodeCommsInterface provides an interface to request information from remove nodes.
Expand Down Expand Up @@ -60,22 +60,15 @@ impl OutboundNodeCommsInterface {
/// Fetch the Blocks corresponding to the provided block hashes from a specific base node.
pub async fn request_blocks_by_hashes_from_peer(
&mut self,
block_hashes: Vec<BlockHash>,
hash: BlockHash,
node_id: Option<NodeId>,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
if let NodeCommsResponse::HistoricalBlocks(blocks) = self
) -> Result<Option<Block>, CommsInterfaceError> {
if let NodeCommsResponse::Block(block) = self
.request_sender
.call((
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
// We always request compact inputs from peer
compact: true,
},
node_id,
))
.call((NodeCommsRequest::GetBlockFromAllChains(hash), node_id))
.await??
{
Ok(blocks)
Ok(*block)
} else {
Err(CommsInterfaceError::UnexpectedApiResponse)
}
Expand Down
8 changes: 3 additions & 5 deletions base_layer/core/src/base_node/proto/request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ package tari.base_node;
message BaseNodeServiceRequest {
uint64 request_key = 1;
oneof request {
// Indicates a FetchBlocksByHash request.
FetchBlocksByHashRequest fetch_blocks_by_hash = 8;
GetBlockFromAllChainsRequest get_block_from_all_chains = 8;
ExcessSigs fetch_mempool_transactions_by_excess_sigs = 9;
}
}
Expand All @@ -27,9 +26,8 @@ message BlockHeights {
repeated uint64 heights = 1;
}

message FetchBlocksByHashRequest {
repeated bytes block_hashes = 1;
bool compact = 2;
message GetBlockFromAllChainsRequest {
bytes hash = 1;
}

message Signatures {
Expand Down
23 changes: 6 additions & 17 deletions base_layer/core/src/base_node/proto/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,10 @@ impl TryInto<NodeCommsRequest> for ProtoNodeCommsRequest {
type Error = String;

fn try_into(self) -> Result<NodeCommsRequest, Self::Error> {
use ProtoNodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs};
use ProtoNodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains};
let request = match self {
FetchBlocksByHash(req) => {
let block_hashes = req
.block_hashes
.into_iter()
.map(|hash| hash.try_into().map_err(|_| "Malformed hash".to_string()))
.collect::<Result<_, _>>()?;
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
compact: req.compact,
}
GetBlockFromAllChains(req) => {
NodeCommsRequest::GetBlockFromAllChains(req.hash.try_into().map_err(|_| "Malformed hash".to_string())?)
},
FetchMempoolTransactionsByExcessSigs(excess_sigs) => {
let excess_sigs = excess_sigs
Expand All @@ -66,13 +58,10 @@ impl TryFrom<NodeCommsRequest> for ProtoNodeCommsRequest {
type Error = String;

fn try_from(request: NodeCommsRequest) -> Result<Self, Self::Error> {
use NodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs};
use NodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains};
match request {
FetchBlocksByHash { block_hashes, compact } => Ok(ProtoNodeCommsRequest::FetchBlocksByHash(
proto::FetchBlocksByHashRequest {
block_hashes: block_hashes.into_iter().map(|hash| hash.to_vec()).collect(),
compact,
},
GetBlockFromAllChains(hash) => Ok(ProtoNodeCommsRequest::GetBlockFromAllChains(
proto::GetBlockFromAllChainsRequest { hash: hash.to_vec() },
)),
FetchMempoolTransactionsByExcessSigs { excess_sigs } => Ok(
ProtoNodeCommsRequest::FetchMempoolTransactionsByExcessSigs(proto::ExcessSigs {
Expand Down
5 changes: 5 additions & 0 deletions base_layer/core/src/base_node/proto/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package tari.base_node;
message BaseNodeServiceResponse {
uint64 request_key = 1;
oneof response {
BlockResponse block_response = 5;
// Indicates a HistoricalBlocks response.
HistoricalBlocks historical_blocks = 6;
FetchMempoolTransactionsResponse fetch_mempool_transactions_by_excess_sigs_response = 7;
Expand Down Expand Up @@ -44,6 +45,10 @@ message HistoricalBlocks {
repeated tari.core.HistoricalBlock blocks = 1;
}

message BlockResponse {
tari.core.Block block = 1;
}

message NewBlockResponse {
bool success = 1;
string error = 2;
Expand Down
30 changes: 28 additions & 2 deletions base_layer/core/src/base_node/proto/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ use tari_utilities::{convert::try_convert_all, ByteArray};
pub use crate::proto::base_node::base_node_service_response::Response as ProtoNodeCommsResponse;
use crate::{
base_node::comms_interface::{FetchMempoolTransactionsResponse, NodeCommsResponse},
blocks::{BlockHeader, HistoricalBlock},
blocks::{Block, BlockHeader, HistoricalBlock},
proto,
};

impl TryInto<NodeCommsResponse> for ProtoNodeCommsResponse {
type Error = String;

fn try_into(self) -> Result<NodeCommsResponse, Self::Error> {
use ProtoNodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
use ProtoNodeCommsResponse::{BlockResponse, FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
let response = match self {
BlockResponse(block) => NodeCommsResponse::Block(Box::new(block.try_into()?)),
HistoricalBlocks(blocks) => {
let blocks = try_convert_all(blocks.blocks)?;
NodeCommsResponse::HistoricalBlocks(blocks)
Expand Down Expand Up @@ -76,6 +77,7 @@ impl TryFrom<NodeCommsResponse> for ProtoNodeCommsResponse {
fn try_from(response: NodeCommsResponse) -> Result<Self, Self::Error> {
use NodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
match response {
NodeCommsResponse::Block(block) => Ok(ProtoNodeCommsResponse::BlockResponse((*block).try_into()?)),
HistoricalBlocks(historical_blocks) => {
let historical_blocks = historical_blocks
.into_iter()
Expand Down Expand Up @@ -151,6 +153,30 @@ impl TryInto<Option<HistoricalBlock>> for proto::base_node::HistoricalBlockRespo
}
}

impl TryFrom<Option<Block>> for proto::base_node::BlockResponse {
type Error = String;

fn try_from(v: Option<Block>) -> Result<Self, Self::Error> {
Ok(Self {
block: v.map(TryInto::try_into).transpose()?,
})
}
}

impl TryInto<Option<Block>> for proto::base_node::BlockResponse {
type Error = String;

fn try_into(self) -> Result<Option<Block>, Self::Error> {
match self.block {
Some(block) => {
let block = block.try_into()?;
Ok(Some(block))
},
None => Ok(None),
}
}
}

//---------------------------------- Collection impls --------------------------------------------//

// The following allow `Iterator::collect` to collect into these repeated types
Expand Down

0 comments on commit 5bcf6e5

Please sign in to comment.