Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fast sync child trie support. #9239

Merged
merged 27 commits into from
Nov 7, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sp_blockchain;
use sp_consensus::BlockOrigin;
use parking_lot::RwLock;

pub use sp_state_machine::Backend as StateBackend;
pub use sp_state_machine::{Backend as StateBackend, KeyValueStates};
pub use sp_consensus::ImportedState;
use std::marker::PhantomData;

Expand Down
2 changes: 1 addition & 1 deletion client/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use notifications::*;
pub use proof_provider::*;
pub use sp_blockchain::HeaderBackend;

pub use sp_state_machine::{StorageProof, ExecutionStrategy};
pub use sp_state_machine::{StorageProof, CompactProof, ExecutionStrategy};
pub use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo};

/// Usage Information Provider interface
Expand Down
37 changes: 26 additions & 11 deletions client/api/src/proof_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT},
};
use crate::{StorageProof, ChangesProof};
use crate::{StorageProof, ChangesProof, CompactProof};
use sp_storage::{ChildInfo, StorageKey, PrefixedStorageKey};
use sp_state_machine::{KeyValueStates, KeyValueState};


/// Interface for providing block proving utilities.
pub trait ProofProvider<Block: BlockT> {
Expand Down Expand Up @@ -71,30 +73,43 @@ pub trait ProofProvider<Block: BlockT> {
key: &StorageKey,
) -> sp_blockchain::Result<ChangesProof<Block::Header>>;

/// Given a `BlockId` iterate over all storage values starting at `start_key` exclusively,
/// building proofs until size limit is reached. Returns combined proof and the number of collected keys.
/// Given a `BlockId` iterate over all storage values starting at `start_keys`.
/// Last `start_keys` element contains last accessed key value.
/// With multiple `start_keys`, first `start_keys` element is
/// the current storage key of of the last accessed child trie.
/// at last level the value to start at exclusively.
/// Proofs is build until size limit is reached and always include at
/// least one key following `start_keys`.
/// Returns combined proof and the numbers of collected keys.
fn read_proof_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
start_keys: &[Vec<u8>],
size_limit: usize,
) -> sp_blockchain::Result<(StorageProof, u32)>;
) -> sp_blockchain::Result<(CompactProof, u32)>;

/// Given a `BlockId` iterate over all storage values starting at `start_key`.
/// Returns collected keys and values.
/// Returns the collected keys values content of the top trie followed by the
/// collected keys values of child tries.
/// Only child tries with their root part of the collected content or
/// related to `start_key` are attached.
/// For each collected state a boolean indicates if state reach
/// end.
fn storage_collection(
&self,
id: &BlockId<Block>,
start_key: &[u8],
start_key: &[Vec<u8>],
size_limit: usize,
) -> sp_blockchain::Result<Vec<(Vec<u8>, Vec<u8>)>>;
) -> sp_blockchain::Result<Vec<(KeyValueState, bool)>>;

/// Verify read storage proof for a set of keys.
/// Returns collected key-value pairs and a flag indicating if iteration is complete.
/// Returns collected key-value pairs and a the nested state
/// depth of current iteration or 0 if completed.
fn verify_range_proof(
&self,
root: Block::Hash,
proof: StorageProof,
start_key: &[u8],
) -> sp_blockchain::Result<(Vec<(Vec<u8>, Vec<u8>)>, bool)>;
proof: CompactProof,
start_keys: &[Vec<u8>],
) -> sp_blockchain::Result<(KeyValueStates, usize)>;
}
108 changes: 75 additions & 33 deletions client/network/src/protocol/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
use std::sync::Arc;
use codec::{Encode, Decode};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sc_client_api::StorageProof;
use sc_client_api::CompactProof;
use crate::schema::v1::{StateRequest, StateResponse, StateEntry};
use crate::chain::{Client, ImportedState};
use smallvec::SmallVec;
use std::collections::HashMap;
use sp_core::storage::well_known_keys;
use super::StateDownloadProgress;

/// State sync support.
Expand All @@ -32,8 +35,8 @@ pub struct StateSync<B: BlockT> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
last_key: Vec<u8>,
state: Vec<(Vec<u8>, Vec<u8>)>,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>>,
complete: bool,
client: Arc<dyn Client<B>>,
imported_bytes: u64,
Expand All @@ -58,8 +61,8 @@ impl<B: BlockT> StateSync<B> {
target_block: target.hash(),
target_root: target.state_root().clone(),
target_header: target,
last_key: Vec::default(),
state: Vec::default(),
last_key: SmallVec::default(),
state: HashMap::default(),
complete: false,
imported_bytes: 0,
skip_proof,
Expand All @@ -68,7 +71,7 @@ impl<B: BlockT> StateSync<B> {

/// Validate and import a state reponse.
pub fn import(&mut self, response: StateResponse) -> ImportResult<B> {
if response.entries.is_empty() && response.proof.is_empty() && !response.complete {
if response.entries.is_empty() && response.proof.is_empty() {
log::debug!(
target: "sync",
"Bad state response",
Expand All @@ -89,17 +92,17 @@ impl<B: BlockT> StateSync<B> {
response.proof.len(),
);
let proof_size = response.proof.len() as u64;
let proof = match StorageProof::decode(&mut response.proof.as_ref()) {
let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
Ok(proof) => proof,
Err(e) => {
log::debug!(target: "sync", "Error decoding proof: {:?}", e);
return ImportResult::BadResponse;
}
};
let (values, complete) = match self.client.verify_range_proof(
let (values, completed) = match self.client.verify_range_proof(
self.target_root,
proof,
&self.last_key
self.last_key.as_slice(),
) {
Err(e) => {
log::debug!(
Expand All @@ -113,38 +116,77 @@ impl<B: BlockT> StateSync<B> {
};
log::debug!(target: "sync", "Imported with {} keys", values.len());

if let Some(last) = values.last().map(|(k, _)| k) {
self.last_key = last.clone();
}

for (key, value) in values {
self.imported_bytes += key.len() as u64;
self.state.push((key, value))
let complete = completed == 0;
if !complete && !values.update_last_key(completed, &mut self.last_key) {
log::debug!(target: "sync", "Error updating key cursor, depth: {}", completed);
};

for values in values.0 {
use std::collections::hash_map::Entry;
let key_values = if values.parent_storages.len() == 0 {
// skip all child key root (will be recalculated on import)
values.key_values.into_iter()
.filter(|key_value| !well_known_keys::is_child_storage_key(key_value.0.as_slice()))
.collect()
} else {
values.key_values
};
match self.state.entry(values.parent_storages) {
Entry::Occupied(mut entry) => {
for (key, value) in key_values {
self.imported_bytes += key.len() as u64;
entry.get_mut().push((key, value))
}
},
Entry::Vacant(entry) => {
for (key, _value) in key_values.iter() {
self.imported_bytes += key.len() as u64;
}
entry.insert(key_values);
},
}
}
self.imported_bytes += proof_size;
complete
} else {
log::debug!(
target: "sync",
"Importing state from {:?} to {:?}",
response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);

if let Some(e) = response.entries.last() {
self.last_key = e.key.clone();
let mut complete = true;
if self.last_key.len() == 2 && response.entries[0].entries.len() == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.entries could be empty here?
Also, why self.last_key.len() == 2? Could use some explaining in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is empty when the whole change set is into a child trie: then we do not have the parent key in parent payload (already define in previous payload). I will add comment.

// Unchanged parent trie key, keep old value.
self.last_key.pop();
} else {
self.last_key.clear();
}
for StateEntry { key, value } in response.entries {
self.imported_bytes += (key.len() + value.len()) as u64;
self.state.push((key, value))
for state in response.entries {
log::debug!(
target: "sync",
"Importing state from {:?} to {:?}",
state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);

if !state.complete {
if let Some(e) = state.entries.last() {
self.last_key.push(e.key.clone());
}
complete = false;
}
let is_top = state.parent_storages.len() == 0;
let entry = self.state.entry(state.parent_storages).or_default();
for StateEntry { key, value } in state.entries {
// Skip all child key root (will be recalculated on import).
if !(is_top && well_known_keys::is_child_storage_key(key.as_slice())) {
self.imported_bytes += key.len() as u64;
entry.push((key, value))
}
}
}
response.complete
complete
};
if complete {
self.complete = true;
ImportResult::Import(self.target_block.clone(), self.target_header.clone(), ImportedState {
block: self.target_block.clone(),
state: std::mem::take(&mut self.state)
state: std::mem::take(&mut self.state).into(),
})
} else {
ImportResult::Continue(self.next_request())
Expand All @@ -155,7 +197,7 @@ impl<B: BlockT> StateSync<B> {
pub fn next_request(&self) -> StateRequest {
StateRequest {
block: self.target_block.encode(),
start: self.last_key.clone(),
start: self.last_key.clone().into_vec(),
no_proof: self.skip_proof,
}
}
Expand All @@ -177,11 +219,11 @@ impl<B: BlockT> StateSync<B> {

/// Returns state sync estimated progress.
pub fn progress(&self) -> StateDownloadProgress {
let percent_done = (*self.last_key.get(0).unwrap_or(&0u8) as u32) * 100 / 256;
let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
let percent_done = cursor as u32 * 100 / 256;
StateDownloadProgress {
percentage: percent_done,
size: self.imported_bytes,
}
}
}

19 changes: 14 additions & 5 deletions client/network/src/schema/api.v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,31 @@ message BlockData {
message StateRequest {
// Block header hash.
bytes block = 1;
// Start from this key. Equivalent to <empty bytes> if omitted.
bytes start = 2; // optional
// Start from this key.
// Multiple keys used for nested state start.
repeated bytes start = 2; // optional
// if 'true' indicates that response should contain raw key-values, rather than proof.
bool no_proof = 3;
}

message StateResponse {
// A collection of keys-values. Only populated if `no_proof` is `true`
repeated StateEntry entries = 1;
// A collection of keys-values states. Only populated if `no_proof` is `true`
repeated KeyValueStateEntry entries = 1;
// If `no_proof` is false in request, this contains proof nodes.
bytes proof = 2;
}

// A key value state.
message KeyValueStateEntry {
// Parent nested storage location, empty for top state.
repeated bytes parent_storages = 1;
// A collection of keys-values.
repeated StateEntry entries = 2;
// Set to true when there are no more keys to return.
bool complete = 3;
}

// A key-value pair
// A key-value pair.
message StateEntry {
bytes key = 1;
bytes value = 2;
Expand Down
38 changes: 20 additions & 18 deletions client/network/src/state_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use codec::{Encode, Decode};
use crate::chain::Client;
use crate::config::ProtocolId;
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use crate::schema::v1::{StateResponse, StateRequest, StateEntry};
use crate::schema::v1::{StateResponse, StateRequest, StateEntry, KeyValueStateEntry};
use crate::{PeerId, ReputationChange};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
Expand Down Expand Up @@ -70,7 +70,7 @@ fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
struct SeenRequestsKey<B: BlockT> {
peer: PeerId,
block: B::Hash,
start: Vec<u8>,
start: Vec<Vec<u8>>,
}

impl<B: BlockT> Hash for SeenRequestsKey<B> {
Expand Down Expand Up @@ -168,46 +168,48 @@ impl<B: BlockT> StateRequestHandler<B> {

log::trace!(
target: LOG_TARGET,
"Handling state request from {}: Block {:?}, Starting at {:?}, no_proof={}",
"Handling state request from {}: Block {:?}, Starting at {:x?}, no_proof={}",
peer,
request.block,
sp_core::hexdisplay::HexDisplay::from(&request.start),
&request.start,
request.no_proof,
);

let result = if reputation_changes.is_empty() {
let mut response = StateResponse::default();

if !request.no_proof {
let (proof, count) = self.client.read_proof_collection(
let (proof, _count) = self.client.read_proof_collection(
&BlockId::hash(block),
&request.start,
request.start.as_slice(),
MAX_RESPONSE_BYTES,
)?;
response.proof = proof.encode();
if count == 0 {
response.complete = true;
}
} else {
let entries = self.client.storage_collection(
&BlockId::hash(block),
&request.start,
request.start.as_slice(),
MAX_RESPONSE_BYTES,
)?;
response.entries = entries.into_iter().map(|(key, value)| StateEntry { key, value }).collect();
if response.entries.is_empty() {
response.complete = true;
}
response.entries = entries.into_iter().map(|(state, complete)| {
KeyValueStateEntry {
parent_storages: state.parent_storages,
entries: state.key_values.into_iter()
.map(|(key, value)| StateEntry { key, value }).collect(),
complete,
}
}).collect();
}

log::trace!(
target: LOG_TARGET,
"StateResponse contains {} keys, {}, proof nodes, complete={}, from {:?} to {:?}",
"StateResponse contains {} keys, {}, proof nodes, from {:?} to {:?}",
response.entries.len(),
response.proof.len(),
response.complete,
response.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
response.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
response.entries.get(0).and_then(|top| top.entries.first()
.map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
response.entries.get(0).and_then(|top| top.entries.last()
.map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key))),
);
if let Some(value) = self.seen_requests.get_mut(&key) {
// If this is the first time we have processed this request, we need to change
Expand Down
Loading