Skip to content

Commit

Permalink
Merge branch 'master' into piotr-cross-shard-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
pmnoxx authored Aug 10, 2020
2 parents 7d50fba + 7ee16b5 commit 9868e81
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 102 deletions.
29 changes: 14 additions & 15 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,18 @@ impl Chain {
return Ok(BlockSyncResponse::None);
}

// Find common block between header chain and block chain.
let mut oldest_height = header_head.height;
let next_epoch_id =
self.get_block_header(&block_head.last_block_hash)?.next_epoch_id().clone();

// Don't run State Sync if header head is not more than one epoch ahead.
if block_head.epoch_id != header_head.epoch_id && next_epoch_id != header_head.epoch_id {
if block_head.height < header_head.height.saturating_sub(block_fetch_horizon) {
// Epochs are different and we are too far from horizon, State Sync is needed
return Ok(BlockSyncResponse::StateNeeded);
}
}

// Find hashes of blocks to sync
let mut current = self.get_block_header(&header_head.last_block_hash).map(|h| h.clone());
while let Ok(header) = current {
if header.height() <= block_head.height {
Expand All @@ -835,21 +845,9 @@ impl Chain {
}
}

oldest_height = header.height();
hashes.push(*header.hash());
current = self.get_previous_header(&header).map(|h| h.clone());
}
let next_epoch_id =
self.get_block_header(&block_head.last_block_hash)?.next_epoch_id().clone();

// Don't run State Sync if header head is not more than one epoch ahead.
if block_head.epoch_id != header_head.epoch_id && next_epoch_id != header_head.epoch_id {
let sync_head = self.sync_head()?;
if oldest_height < sync_head.height.saturating_sub(block_fetch_horizon) {
// Epochs are different and we are too far from horizon, State Sync is needed
return Ok(BlockSyncResponse::StateNeeded);
}
}

// Sort hashes by height
hashes.reverse();
Expand Down Expand Up @@ -904,10 +902,11 @@ impl Chain {
}

pub fn reset_data_pre_state_sync(&mut self, sync_hash: CryptoHash) -> Result<(), Error> {
let head = self.head()?;
// Get header we were syncing into.
let header = self.get_block_header(&sync_hash)?;
let prev_hash = *header.prev_hash();
let gc_height = header.height();
let gc_height = std::cmp::min(head.height + 1, header.height());

// GC all the data from current tail up to `gc_height`
let tail = self.store.tail()?;
Expand Down
16 changes: 8 additions & 8 deletions chain/network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl PeerManagerActor {
ctx.spawn(async move {
while let Some(response) = requests.next().await {
if let Err(e) = response {
error!(target: "network", "Failed sending broadcast message(query_active_peers): {}", e);
debug!(target: "network", "Failed sending broadcast message(query_active_peers): {}", e);
}
}
}.into_actor(self));
Expand Down Expand Up @@ -742,7 +742,7 @@ impl PeerManagerActor {
ctx.spawn(async move {
while let Some(response) = requests.next().await {
if let Err(e) = response {
error!(target: "network", "Failed sending broadcast message(query_active_peers): {}", e);
debug!(target: "network", "Failed sending broadcast message(broadcast_message): {}", e);
}
}
}.into_actor(self));
Expand Down Expand Up @@ -770,7 +770,7 @@ impl PeerManagerActor {
message: PeerMessage,
) -> bool {
if let Some(active_peer) = self.active_peers.get(&peer_id) {
let msg_kind = format!("{}", message);
let msg_kind = message.msg_variant().to_string();
trace!(target: "network", "Send message: {}", msg_kind);
active_peer
.addr
Expand All @@ -789,9 +789,9 @@ impl PeerManagerActor {
true
} else {
debug!(target: "network",
"Sending message to: {} (which is not an active peer) Active Peers: {:?}\n{:?}",
"Sending message to: {} (which is not an active peer) Num active Peers: {}\n{}",
peer_id,
self.active_peers.keys(),
self.active_peers.len(),
message
);
false
Expand Down Expand Up @@ -845,12 +845,12 @@ impl PeerManagerActor {
.as_str(),
);

debug!(target: "network", "{:?} Drop signed message to {:?} Reason {:?}. Known peers: {:?} Message {:?}",
debug!(target: "network", "{:?} Drop signed message to {:?} Reason {:?}. Num known peers: {} Message {:?}",
self.config.account_id,
msg.target,
find_route_error,
self.routing_table.peer_forwarding.keys(),
msg,
self.routing_table.peer_forwarding.len(),
msg.body,
);
false
}
Expand Down
52 changes: 51 additions & 1 deletion chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use crate::peer::Peer;
#[cfg(feature = "metric_recorder")]
use crate::recorder::MetricRecorder;
use crate::routing::{Edge, EdgeInfo, RoutingTableInfo};
use serde::export::fmt::Error;
use serde::export::Formatter;
use std::fmt::Debug;

/// Number of hops a message is allowed to travel before being dropped.
/// This is used to avoid infinite loop because of inconsistent view of the network
Expand Down Expand Up @@ -221,7 +224,6 @@ pub struct Pong {
PartialEq,
Eq,
Clone,
Debug,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
Expand Down Expand Up @@ -254,6 +256,54 @@ pub enum RoutedMessageBody {
Pong(Pong),
}

impl Debug for RoutedMessageBody {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
RoutedMessageBody::BlockApproval(approval) => write!(
f,
"Approval({}, {}, {:?})",
approval.target_height, approval.account_id, approval.inner
),
RoutedMessageBody::ForwardTx(tx) => write!(f, "tx {}", tx.get_hash()),
RoutedMessageBody::TxStatusRequest(account_id, hash) => {
write!(f, "TxStatusRequest({}, {})", account_id, hash)
}
RoutedMessageBody::TxStatusResponse(response) => {
write!(f, "TxStatusResponse({})", response.transaction.hash)
}
RoutedMessageBody::QueryRequest { .. } => write!(f, "QueryRequest"),
RoutedMessageBody::QueryResponse { .. } => write!(f, "QueryResponse"),
RoutedMessageBody::ReceiptOutcomeRequest(hash) => write!(f, "ReceiptRequest({})", hash),
RoutedMessageBody::ReceiptOutComeResponse(response) => {
write!(f, "ReceiptResponse({})", response.outcome_with_id.id)
}
RoutedMessageBody::StateRequestHeader(shard_id, sync_hash) => {
write!(f, "StateRequestHeader({}, {})", shard_id, sync_hash)
}
RoutedMessageBody::StateRequestPart(shard_id, sync_hash, part_id) => {
write!(f, "StateRequestPart({}, {}, {})", shard_id, sync_hash, part_id)
}
RoutedMessageBody::StateResponse(response) => {
write!(f, "StateResponse({}, {})", response.shard_id, response.sync_hash)
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
write!(f, "PartialChunkRequest({:?}, {:?})", request.chunk_hash, request.part_ords)
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => write!(
f,
"PartialChunkResponse({:?}, {:?})",
response.chunk_hash,
response.parts.iter().map(|p| p.part_ord).collect::<Vec<_>>()
),
RoutedMessageBody::PartialEncodedChunk(chunk) => {
write!(f, "PartialChunk({:?})", chunk.header.hash)
}
RoutedMessageBody::Ping(_) => write!(f, "Ping"),
RoutedMessageBody::Pong(_) => write!(f, "Pong"),
}
}
}

#[derive(BorshSerialize, BorshDeserialize, Serialize, PartialEq, Eq, Clone, Debug)]
pub enum PeerIdOrHash {
PeerId(PeerId),
Expand Down
6 changes: 3 additions & 3 deletions core/primitives/src/contract.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::hash::{hash, CryptoHash};
use crate::hash::{hash as sha256, CryptoHash};

pub struct ContractCode {
pub code: Vec<u8>,
pub hash: CryptoHash,
}

impl ContractCode {
pub fn new(code: Vec<u8>) -> ContractCode {
let hash = hash(&code);
pub fn new(code: Vec<u8>, hash: Option<CryptoHash>) -> ContractCode {
let hash = hash.unwrap_or_else(|| sha256(&code));
ContractCode { code, hash }
}

Expand Down
3 changes: 2 additions & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,11 @@ pub fn set_code(state_update: &mut TrieUpdate, account_id: AccountId, code: &Con
pub fn get_code(
state_update: &TrieUpdate,
account_id: &AccountId,
code_hash: Option<CryptoHash>,
) -> Result<Option<ContractCode>, StorageError> {
state_update
.get(&TrieKey::ContractCode { account_id: account_id.clone() })
.map(|opt| opt.map(|code| ContractCode::new(code.to_vec())))
.map(|opt| opt.map(|code| ContractCode::new(code, code_hash)))
}

/// Removes account, code and all access keys associated to it.
Expand Down
2 changes: 1 addition & 1 deletion genesis-tools/genesis-populate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl GenesisBuilder {
);
records.push(access_key_record);
if let Some(wasm_binary) = self.additional_accounts_code.as_ref() {
let code = ContractCode::new(wasm_binary.clone());
let code = ContractCode::new(wasm_binary.clone(), None);
set_code(&mut state_update, account_id.clone(), &code);
let contract_record = StateRecord::Contract { account_id, code: wasm_binary.clone() };
records.push(contract_record);
Expand Down
75 changes: 75 additions & 0 deletions pytest/lib/account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import base64
import json
import requests
import time

from cluster import Key
from transaction import (
sign_payment_tx, sign_deploy_contract_tx, sign_function_call_tx,
sign_create_account_with_full_access_key_and_balance_tx, sign_staking_tx)
import utils


class Account:

def __init__(self, key, init_nonce, base_block_hash, rpc_info):
self.key = key
self.nonce = init_nonce
self.base_block_hash = base_block_hash
self.rpc_addr, self.rpc_port = rpc_info
self.tx_timestamps = []

def json_rpc(self, method, params):
j = {
'method': method,
'params': params,
'id': 'dontcare',
'jsonrpc': '2.0'
}
r = requests.post(f'http://{self.rpc_addr}:{self.rpc_port}',
json=j,
timeout=10)
return json.loads(r.content)

def send_tx(self, signed_tx):
return self.json_rpc('broadcast_tx_async',
[base64.b64encode(signed_tx).decode('utf8')])

def prep_tx(self):
self.tx_timestamps.append(time.time())
self.nonce += 1

def send_transfer_tx(self, dest_account_id):
self.prep_tx()
transfer_amount = 100
tx = sign_payment_tx(self.key, dest_account_id, transfer_amount,
self.nonce, self.base_block_hash)
return self.send_tx(tx)

def send_deploy_contract_tx(self, wasm_filename):
wasm_binary = utils.load_binary_file(wasm_filename)
self.prep_tx()
tx = sign_deploy_contract_tx(self.key, wasm_binary, self.nonce,
self.base_block_hash)
return self.send_tx(tx)

def send_call_contract_tx(self, method_name, args):
self.prep_tx()
tx = sign_function_call_tx(self.key, self.key.account_id, method_name,
args, 300000000000000, 0, self.nonce,
self.base_block_hash)
return self.send_tx(tx)

def send_create_account_tx(self, new_account_id):
self.prep_tx()
new_key = Key(new_account_id, self.key.pk, self.key.sk)
tx = sign_create_account_with_full_access_key_and_balance_tx(
self.key, new_account_id, new_key, 100, self.nonce,
self.base_block_hash)
return self.send_tx(tx)

def send_stake_tx(self, stake_amount):
self.prep_tx()
tx = sign_staking_tx(self.key, self.key, stake_amount, self.nonce,
self.base_block_hash)
return self.send_tx(tx)
Loading

0 comments on commit 9868e81

Please sign in to comment.