Skip to content

Commit

Permalink
Merge branch 'master' into trie-cache-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
Looogarithm committed Jun 13, 2022
2 parents f4e6b20 + 5004e31 commit 8757766
Show file tree
Hide file tree
Showing 28 changed files with 327 additions and 1,343 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 32 additions & 33 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1962,30 +1962,30 @@ impl<'a> ChainStoreUpdate<'a> {
let chunk = self.get_chunk(&chunk_hash)?.clone();
debug_assert_eq!(chunk.cloned_header().height_created(), height);
for transaction in chunk.transactions() {
self.gc_col(DBCol::Transactions, &transaction.get_hash().into());
self.gc_col(DBCol::Transactions, transaction.get_hash().as_bytes());
}
for receipt in chunk.receipts() {
self.gc_col(DBCol::Receipts, &receipt.get_hash().into());
self.gc_col(DBCol::Receipts, receipt.get_hash().as_bytes());
}

// 2. Delete chunk_hash-indexed data
let chunk_header_hash = chunk_hash.clone().into();
self.gc_col(DBCol::Chunks, &chunk_header_hash);
self.gc_col(DBCol::PartialChunks, &chunk_header_hash);
self.gc_col(DBCol::InvalidChunks, &chunk_header_hash);
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::Chunks, chunk_hash);
self.gc_col(DBCol::PartialChunks, chunk_hash);
self.gc_col(DBCol::InvalidChunks, chunk_hash);
}

let header_hashes = self.chain_store.get_all_header_hashes_by_height(height)?;
for _header_hash in header_hashes {
// 3. Delete header_hash-indexed data
// TODO #3488: enable
//self.gc_col(DBCol::BlockHeader, &header_hash.into());
//self.gc_col(DBCol::BlockHeader, header_hash.as_bytes());
}

// 4. Delete chunks_tail-related data
let key = &index_to_bytes(height).to_vec();
self.gc_col(DBCol::ChunkHashesByHeight, key);
self.gc_col(DBCol::HeaderHashesByHeight, key);
let key = index_to_bytes(height);
self.gc_col(DBCol::ChunkHashesByHeight, &key);
self.gc_col(DBCol::HeaderHashesByHeight, &key);
}
self.update_chunk_tail(min_chunk_height);
Ok(())
Expand Down Expand Up @@ -2019,12 +2019,12 @@ impl<'a> ChainStoreUpdate<'a> {
if !chunk_hashes.is_empty() {
remaining -= 1;
for chunk_hash in chunk_hashes {
let chunk_header_hash = chunk_hash.into();
self.gc_col(DBCol::PartialChunks, &chunk_header_hash);
let chunk_hash = chunk_hash.as_bytes();
self.gc_col(DBCol::PartialChunks, chunk_hash);
// Data in DBCol::InvalidChunks isn’t technically redundant (it
// cannot be calculated from other data) but it is data we
// don’t need for anything so it can be deleted as well.
self.gc_col(DBCol::InvalidChunks, &chunk_header_hash);
self.gc_col(DBCol::InvalidChunks, chunk_hash);
}
}
}
Expand Down Expand Up @@ -2151,29 +2151,28 @@ impl<'a> ChainStoreUpdate<'a> {
}

// 3. Delete block_hash-indexed data
let block_hash_vec: Vec<u8> = block_hash.as_ref().into();
self.gc_col(DBCol::Block, &block_hash_vec);
self.gc_col(DBCol::BlockExtra, &block_hash_vec);
self.gc_col(DBCol::NextBlockHashes, &block_hash_vec);
self.gc_col(DBCol::ChallengedBlocks, &block_hash_vec);
self.gc_col(DBCol::BlocksToCatchup, &block_hash_vec);
self.gc_col(DBCol::Block, block_hash.as_bytes());
self.gc_col(DBCol::BlockExtra, block_hash.as_bytes());
self.gc_col(DBCol::NextBlockHashes, block_hash.as_bytes());
self.gc_col(DBCol::ChallengedBlocks, block_hash.as_bytes());
self.gc_col(DBCol::BlocksToCatchup, block_hash.as_bytes());
let storage_key = KeyForStateChanges::for_block(&block_hash);
let stored_state_changes: Vec<Vec<u8>> = self
let stored_state_changes: Vec<Box<[u8]>> = self
.chain_store
.store()
.iter_prefix(DBCol::StateChanges, storage_key.as_ref())
.map(|key| key.0.into())
.map(|(key, _)| key)
.collect();
for key in stored_state_changes {
self.gc_col(DBCol::StateChanges, &key);
}
self.gc_col(DBCol::BlockRefCount, &block_hash_vec);
self.gc_col(DBCol::BlockRefCount, block_hash.as_bytes());
self.gc_outcomes(&block)?;
match gc_mode {
GCMode::StateSync { clear_block_info: false } => {}
_ => self.gc_col(DBCol::BlockInfo, &block_hash_vec),
_ => self.gc_col(DBCol::BlockInfo, block_hash.as_bytes()),
}
self.gc_col(DBCol::StateDlInfos, &block_hash_vec);
self.gc_col(DBCol::StateDlInfos, block_hash.as_bytes());

// 4. Update or delete block_hash_per_height
self.gc_col_block_per_height(&block_hash, height, block.header().epoch_id())?;
Expand Down Expand Up @@ -2227,17 +2226,17 @@ impl<'a> ChainStoreUpdate<'a> {
if hashes.is_empty() {
epoch_to_hashes.remove(epoch_id);
}
let key = index_to_bytes(height).to_vec();
let key = &index_to_bytes(height)[..];
if epoch_to_hashes.is_empty() {
store_update.delete(DBCol::BlockPerHeight, &key);
self.chain_store.block_hash_per_height.pop(&key);
store_update.delete(DBCol::BlockPerHeight, key);
self.chain_store.block_hash_per_height.pop(key);
} else {
store_update.set_ser(DBCol::BlockPerHeight, &key, &epoch_to_hashes)?;
self.chain_store.block_hash_per_height.put(key.clone(), Arc::new(epoch_to_hashes));
store_update.set_ser(DBCol::BlockPerHeight, key, &epoch_to_hashes)?;
self.chain_store.block_hash_per_height.put(key.to_vec(), Arc::new(epoch_to_hashes));
}
self.inc_gc(DBCol::BlockPerHeight);
if self.is_height_processed(height)? {
self.gc_col(DBCol::ProcessedBlockHeights, &key);
self.gc_col(DBCol::ProcessedBlockHeights, key);
}
self.merge(store_update);
Ok(())
Expand Down Expand Up @@ -2303,11 +2302,11 @@ impl<'a> ChainStoreUpdate<'a> {
let mut outcomes_with_id = self.chain_store.get_outcomes_by_id(&outcome_id)?;
outcomes_with_id.retain(|outcome| &outcome.block_hash != block_hash);
if outcomes_with_id.is_empty() {
self.gc_col(DBCol::TransactionResult, &outcome_id.as_ref().into());
self.gc_col(DBCol::TransactionResult, outcome_id.as_bytes());
} else {
store_update.set_ser(
DBCol::TransactionResult,
outcome_id.as_ref(),
outcome_id.as_bytes(),
&outcomes_with_id,
)?;
}
Expand All @@ -2318,7 +2317,7 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

fn gc_col(&mut self, col: DBCol, key: &Vec<u8>) {
fn gc_col(&mut self, col: DBCol, key: &[u8]) {
assert!(col.is_gc());
let mut store_update = self.store().store_update();
match col {
Expand Down
1 change: 0 additions & 1 deletion chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ pub fn setup_mock_all_validators(
| NetworkRequests::RequestUpdateNonce(_, _)
| NetworkRequests::ResponseUpdateNonce(_)
| NetworkRequests::ReceiptOutComeRequest(_, _) => {}
| NetworkRequests::IbfMessage { .. } => {}
};
}
Box::new(Some(resp))
Expand Down
21 changes: 0 additions & 21 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,27 +320,6 @@ impl JsonRpcHandler {
.map_err(|err| RpcError::serialization_error(err.to_string())),
)
}
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
"adv_start_routing_table_syncv2" => {
let params = parse_params::<
near_jsonrpc_adversarial_primitives::StartRoutingTableSyncRequest,
>(params)?;

self.peer_manager_addr
.send(
near_network::types::PeerManagerMessageRequest::StartRoutingTableSync(
near_network::private_actix::StartRoutingTableSync {
peer_id: params.peer_id,
},
),
)
.await
.map_err(RpcError::rpc_from)?;
Some(
serde_json::to_value(())
.map_err(|err| RpcError::serialization_error(err.to_string())),
)
}
"adv_get_peer_id" => {
let response = self
.peer_manager_addr
Expand Down
1 change: 1 addition & 0 deletions chain/network-primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ borsh = "0.9"
once_cell = "1.12.0"
chrono = { version = "0.4.4", features = ["serde"] }
deepsize = { version = "0.2.0", optional = true }
opentelemetry = { version = "0.17", features = ["trace"] }
serde = { version = "1", features = ["alloc", "derive", "rc"] }
strum = { version = "0.24", features = ["derive"] }
time = "0.3.9"
Expand Down
8 changes: 8 additions & 0 deletions chain/network-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ pub enum PeerManagerRequest {
UnregisterPeer,
}

/// Messages from PeerManager to Peer with a tracing Context.
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct PeerManagerRequestWithContext {
pub msg: PeerManagerRequest,
pub context: opentelemetry::Context,
}

#[derive(Debug, Clone)]
pub struct KnownProducer {
pub account_id: AccountId,
Expand Down
2 changes: 2 additions & 0 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lru = "0.7.2"
thiserror = "1"
near-rust-allocator-proxy = { version = "0.4", optional = true }
once_cell = "1.12.0"
opentelemetry = { version = "0.17", features = ["trace"] }
rand = "0.6"
rand_pcg = "0.1"
serde = { version = "1", features = ["alloc", "derive", "rc"], optional = true }
Expand All @@ -37,6 +38,7 @@ tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.7.1", features = ["codec"] }
tokio = { version = "1.1", features = ["net", "rt-multi-thread"] }
tracing = "0.1.13"
tracing-opentelemetry = { version = "0.17" }
assert_matches = "1.3"

delay-detector = { path = "../../tools/delay_detector" }
Expand Down
35 changes: 1 addition & 34 deletions chain/network/src/network_protocol/borsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,40 +159,7 @@ pub enum PeerMessage {
EpochSyncFinalizationRequest(EpochId),
EpochSyncFinalizationResponse(Box<EpochSyncFinalizationResponse>),

RoutingTableSyncV2(RoutingSyncV2),
_RoutingTableSyncV2,
}
#[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready
const _: () = assert!(std::mem::size_of::<PeerMessage>() <= 1144, "PeerMessage > 1144 bytes");

#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)]
pub enum RoutingSyncV2 {
Version2(RoutingVersion2),
}
const _: () = assert!(std::mem::size_of::<RoutingSyncV2>() <= 80, "RoutingSyncV2 > 80 bytes");

#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)]
pub struct PartialSync {
pub(crate) ibf_level: crate::routing::ibf_peer_set::ValidIBFLevel,
pub(crate) ibf: Vec<crate::routing::ibf::IbfBox>,
}

#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)]
pub enum RoutingState {
PartialSync(PartialSync),
RequestAllEdges,
Done,
RequestMissingEdges(Vec<u64>),
InitializeIbf,
}

#[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))]
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)]
pub struct RoutingVersion2 {
pub(crate) known_edges: u64,
pub(crate) seed: u64,
pub(crate) edges: Vec<Edge>,
pub(crate) routing_state: RoutingState,
}
7 changes: 5 additions & 2 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl From<&mem::HandshakeFailureReason> for net::HandshakeFailureReason {
pub enum ParsePeerMessageError {
#[error("HandshakeV2 is deprecated")]
DeprecatedHandshakeV2,
#[error("RoutingTableSyncV2 is deprecated")]
DeprecatedRoutingTableSyncV2,
}

impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
Expand Down Expand Up @@ -116,7 +118,9 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
net::PeerMessage::EpochSyncFinalizationResponse(esfr) => {
mem::PeerMessage::EpochSyncFinalizationResponse(esfr)
}
net::PeerMessage::RoutingTableSyncV2(rs) => mem::PeerMessage::RoutingTableSyncV2(rs),
net::PeerMessage::_RoutingTableSyncV2 => {
return Err(Self::Error::DeprecatedRoutingTableSyncV2)
}
})
}
}
Expand Down Expand Up @@ -154,7 +158,6 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
mem::PeerMessage::EpochSyncFinalizationResponse(esfr) => {
net::PeerMessage::EpochSyncFinalizationResponse(esfr)
}
mem::PeerMessage::RoutingTableSyncV2(rs) => net::PeerMessage::RoutingTableSyncV2(rs),
}
}
}
6 changes: 1 addition & 5 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use protobuf::Message as _;
use std::fmt;
use thiserror::Error;

pub use self::borsh::{
PartialSync, RoutingState, RoutingSyncV2, RoutingTableUpdate, RoutingVersion2,
};
pub use self::borsh::RoutingTableUpdate;

/// Structure representing handshake between peers.
#[derive(PartialEq, Eq, Clone, Debug)]
Expand Down Expand Up @@ -107,8 +105,6 @@ pub enum PeerMessage {
EpochSyncResponse(Box<EpochSyncResponse>),
EpochSyncFinalizationRequest(EpochId),
EpochSyncFinalizationResponse(Box<EpochSyncFinalizationResponse>),

RoutingTableSyncV2(RoutingSyncV2),
}

impl fmt::Display for PeerMessage {
Expand Down
4 changes: 2 additions & 2 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ message PeerMessage {
// between borsh and protobuf encodings:
// https://docs.google.com/document/d/1gCWmt9O-h_-5JDXIqbKxAaSS3Q9pryB1f9DDY1mMav4/edit
reserved 1,2,3;
// Deprecated fields.
reserved 24;

oneof message_type {
Handshake handshake = 4;
Expand Down Expand Up @@ -340,7 +342,5 @@ message PeerMessage {
EpochSyncResponse epoch_sync_response = 21;
EpochSyncFinalizationRequest epoch_sync_finalization_request = 22;
EpochSyncFinalizationResponse epoch_sync_finalization_response = 23;

RoutingSyncV2 routing_table_sync_v2 = 24;
}
}
Loading

0 comments on commit 8757766

Please sign in to comment.