Skip to content

Commit

Permalink
Notification-based block pinning (paritytech#13157)
Browse files Browse the repository at this point in the history
* Worker

* Reorganize and unpin onnotification drop

* Pin in state-db, pass block number

* Pin blocks in blockchain db

* Switch to reference counted LRU

* Disable pinning when we keep all blocks

* Fix pinning hint for state-db

* Remove pinning from backend layer

* Improve readability

* Add justifications to test

* Fix justification behaviour

* Remove debug prints

* Convert channels to tracing_unbounded

* Add comments to the test

* Documentation and Cleanup

* Move task start to client

* Simplify cache

* Improve test, remove unwanted log

* Add tracing logs, remove expect for block number

* Cleanup

* Add conversion method for unpin handle to Finalitynotification

* Revert unwanted changes

* Improve naming

* Make clippy happy

* Fix docs

Co-authored-by: Michal Kucharczyk <[email protected]>

* Use `NumberFor` instead of u64 in API

* Hand over weak reference to unpin worker task

* Unwanted

* &Hash -> Hash

* Remove number from interface, rename `_unpin_handle`, LOG_TARGET

* Move RwLock one layer up

* Apply code style suggestions

* Improve comments

* Replace lru crate by schnellru

* Only insert values for pinned items + better docs

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <[email protected]>

* Improve comments, log target and test

Co-authored-by: Michal Kucharczyk <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
3 people authored and ltfschoen committed Feb 22, 2023
1 parent e329033 commit e08ac02
Show file tree
Hide file tree
Showing 11 changed files with 954 additions and 99 deletions.
52 changes: 41 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,24 @@ pub trait StorageProvider<Block: BlockT, B: Backend<Block>> {
///
/// Manages the data layer.
///
/// Note on state pruning: while an object from `state_at` is alive, the state
/// # State Pruning
///
/// While an object from `state_at` is alive, the state
/// should not be pruned. The backend should internally reference-count
/// its state objects.
///
/// The same applies for live `BlockImportOperation`s: while an import operation building on a
/// parent `P` is alive, the state for `P` should not be pruned.
///
/// # Block Pruning
///
/// Users can pin blocks in memory by calling `pin_block`. When
/// a block would be pruned, its value is kept in an in-memory cache
/// until it is unpinned via `unpin_block`.
///
/// While a block is pinned, its state is also preserved.
///
/// The backend should internally reference count the number of pin / unpin calls.
pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
/// Associated block insertion operation type.
type BlockImportOperation: BlockImportOperation<Block, State = Self::State>;
Expand Down Expand Up @@ -502,6 +514,14 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
/// Returns a handle to offchain storage.
fn offchain_storage(&self) -> Option<Self::OffchainStorage>;

/// Pin the block to keep body, justification and state available after pruning.
/// Number of pins are reference counted. Users need to make sure to perform
/// one call to [`Self::unpin_block`] per call to [`Self::pin_block`].
fn pin_block(&self, hash: Block::Hash) -> sp_blockchain::Result<()>;

/// Unpin the block to allow pruning.
fn unpin_block(&self, hash: Block::Hash);

/// Returns true if state for given block is available.
fn have_state_at(&self, hash: Block::Hash, _number: NumberFor<Block>) -> bool {
self.state_at(hash).is_ok()
Expand Down
109 changes: 103 additions & 6 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{collections::HashSet, fmt, sync::Arc};
use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};

use sc_transaction_pool_api::ChainEvent;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain;

/// Type that implements `futures::Stream` of block import events.
Expand Down Expand Up @@ -264,6 +264,53 @@ impl fmt::Display for UsageInfo {
}
}

/// Sends a message to the pinning-worker once dropped to unpin a block in the backend.
#[derive(Debug)]
pub struct UnpinHandleInner<Block: BlockT> {
/// Hash of the block pinned by this handle
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
}

impl<Block: BlockT> UnpinHandleInner<Block> {
/// Create a new [`UnpinHandleInner`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
) -> Self {
Self { hash, unpin_worker_sender }
}
}

impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
fn drop(&mut self) {
if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
};
}
}

/// Keeps a specific block pinned while the handle is alive.
/// Once the last handle instance for a given block is dropped, the
/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block).
#[derive(Debug, Clone)]
pub struct UnpinHandle<Block: BlockT>(Arc<UnpinHandleInner<Block>>);

impl<Block: BlockT> UnpinHandle<Block> {
/// Create a new [`UnpinHandle`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
) -> UnpinHandle<Block> {
UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender)))
}

/// Hash of the block this handle is unpinning on drop
pub fn hash(&self) -> Block::Hash {
self.0.hash
}
}

/// Summary of an imported block
#[derive(Clone, Debug)]
pub struct BlockImportNotification<Block: BlockT> {
Expand All @@ -279,6 +326,36 @@ pub struct BlockImportNotification<Block: BlockT> {
///
/// If `None`, there was no re-org while importing.
pub tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
/// Handle to unpin the block this notification is for
unpin_handle: UnpinHandle<Block>,
}

impl<Block: BlockT> BlockImportNotification<Block> {
/// Create new notification
pub fn new(
hash: Block::Hash,
origin: BlockOrigin,
header: Block::Header,
is_new_best: bool,
tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
) -> Self {
Self {
hash,
origin,
header,
is_new_best,
tree_route,
unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
}
}

/// Consume this notification and extract the unpin handle.
///
/// Note: Only use this if you want to keep the block pinned in the backend.
pub fn into_unpin_handle(self) -> UnpinHandle<Block> {
self.unpin_handle
}
}

/// Summary of a finalized block.
Expand All @@ -294,6 +371,8 @@ pub struct FinalityNotification<Block: BlockT> {
pub tree_route: Arc<[Block::Hash]>,
/// Stale branches heads.
pub stale_heads: Arc<[Block::Hash]>,
/// Handle to unpin the block this notification is for
unpin_handle: UnpinHandle<Block>,
}

impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {
Expand All @@ -314,26 +393,44 @@ impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
}
}

impl<B: BlockT> From<FinalizeSummary<B>> for FinalityNotification<B> {
fn from(mut summary: FinalizeSummary<B>) -> Self {
impl<Block: BlockT> FinalityNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
mut summary: FinalizeSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
) -> FinalityNotification<Block> {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
hash,
header: summary.header,
tree_route: Arc::from(summary.finalized),
stale_heads: Arc::from(summary.stale_heads),
unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
}
}

/// Consume this notification and extract the unpin handle.
///
/// Note: Only use this if you want to keep the block pinned in the backend.
pub fn into_unpin_handle(self) -> UnpinHandle<Block> {
self.unpin_handle
}
}

impl<B: BlockT> From<ImportSummary<B>> for BlockImportNotification<B> {
fn from(summary: ImportSummary<B>) -> Self {
impl<Block: BlockT> BlockImportNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
summary: ImportSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
) -> BlockImportNotification<Block> {
let hash = summary.hash;
BlockImportNotification {
hash: summary.hash,
hash,
origin: summary.origin,
header: summary.header,
is_new_best: summary.is_new_best,
tree_route: summary.tree_route.map(Arc::new),
unpin_handle: UnpinHandle::new(hash, unpin_worker_sender),
}
}
}
6 changes: 6 additions & 0 deletions client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,12 @@ where
fn requires_full_sync(&self) -> bool {
false
}

fn pin_block(&self, _: <Block as BlockT>::Hash) -> blockchain::Result<()> {
Ok(())
}

fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
}

impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
Expand Down
1 change: 1 addition & 0 deletions client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ parity-db = "0.4.2"
parking_lot = "0.12.1"
sc-client-api = { version = "4.0.0-dev", path = "../api" }
sc-state-db = { version = "0.10.0-dev", path = "../state-db" }
schnellru = "0.2.1"
sp-arithmetic = { version = "6.0.0", path = "../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-core = { version = "7.0.0", path = "../../primitives/core" }
Expand Down
Loading

0 comments on commit e08ac02

Please sign in to comment.