Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of the shielded sync ledger client (backport #4016) #4017

Merged
merged 7 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Improve the shielded sync's ledger client performance and user experience.
([\#4016](https://github.com/anoma/namada/pull/4016))
9 changes: 9 additions & 0 deletions crates/apps_lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,8 @@ pub mod args {
}),
);
pub const BIRTHDAY: ArgOpt<BlockHeight> = arg_opt("birthday");
pub const BLOCK_BATCH: ArgDefault<usize> =
arg_default("block-batch", DefaultFn(|| 10));
pub const BLOCK_HEIGHT: Arg<BlockHeight> = arg("block-height");
pub const BLOCK_HEIGHT_OPT: ArgOpt<BlockHeight> = arg_opt("height");
pub const BLOCK_HEIGHT_TO_OPT: ArgOpt<BlockHeight> = arg_opt("to-height");
Expand Down Expand Up @@ -6803,6 +6805,7 @@ pub mod args {
Some(times) => RetryStrategy::Times(times),
None => RetryStrategy::Forever,
};
let block_batch_size = BLOCK_BATCH.parse(matches);
Self {
ledger_address,
last_query_height,
Expand All @@ -6812,6 +6815,7 @@ pub mod args {
wait_for_last_query_height,
max_concurrent_fetches,
retry_strategy,
block_batch_size,
}
}

Expand Down Expand Up @@ -6849,6 +6853,10 @@ pub mod args {
"Maximum number of times to retry fetching. If no \
argument is provided, defaults to retrying forever."
)))
.arg(BLOCK_BATCH.def().help(wrap!(
"Number of blocks fetched per concurrent fetch job. The \
default is 10."
)))
}
}

Expand All @@ -6862,6 +6870,7 @@ pub mod args {
let chain_ctx = ctx.borrow_mut_chain_or_exit();

Ok(ShieldedSync {
block_batch_size: self.block_batch_size,
max_concurrent_fetches: self.max_concurrent_fetches,
wait_for_last_query_height: self.wait_for_last_query_height,
ledger_address: chain_ctx.get(&self.ledger_address),
Expand Down
2 changes: 2 additions & 0 deletions crates/apps_lib/src/client/masp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn syncing<
.shutdown_signal(install_shutdown_signal(false))
.wait_for_last_query_height(args.wait_for_last_query_height)
.retry_strategy(args.retry_strategy)
.block_batch_size(args.block_batch_size)
.build();

let env = MaspLocalTaskEnv::new(500)
Expand Down Expand Up @@ -141,6 +142,7 @@ pub async fn syncing<
dispatch_client!(LedgerMaspClient::new(
client,
args.max_concurrent_fetches,
Duration::from_millis(5),
))?
};

Expand Down
36 changes: 33 additions & 3 deletions crates/core/src/control_flow/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ pub trait SleepStrategy {
/// Calculate a duration from a sleep strategy state.
fn backoff(&self, state: &Self::State) -> Duration;

/// Update the state of the sleep strategy.
/// Move to the next state of the sleep strategy.
fn next_state(&self, state: &mut Self::State);

/// Move to the previous state of the sleep strategy.
fn prev_state(&self, state: &mut Self::State);

/// Map a function to the duration returned from a
/// sleep strategy.
fn map<M>(self, map: M) -> Map<Self, M>
Expand Down Expand Up @@ -73,6 +76,11 @@ where
fn next_state(&self, state: &mut S::State) {
self.strategy.next_state(state)
}

#[inline]
fn prev_state(&self, state: &mut S::State) {
self.strategy.prev_state(state)
}
}

/// Constant sleep strategy.
Expand All @@ -93,6 +101,10 @@ impl SleepStrategy for Constant {
fn next_state(&self, _: &mut ()) {
// NOOP
}

fn prev_state(&self, _: &mut ()) {
// NOOP
}
}

/// Linear backoff sleep strategy.
Expand All @@ -114,7 +126,11 @@ impl SleepStrategy for LinearBackoff {
}

fn next_state(&self, state: &mut Duration) {
*state += self.delta;
*state = state.saturating_add(self.delta);
}

fn prev_state(&self, state: &mut Duration) {
*state = state.saturating_sub(self.delta);
}
}

Expand Down Expand Up @@ -144,6 +160,10 @@ where
fn next_state(&self, state: &mut Self::State) {
*state = state.saturating_add(1);
}

fn prev_state(&self, state: &mut Self::State) {
*state = state.saturating_sub(1);
}
}

/// A [`SleepStrategy`] adaptor, to run async tasks with custom
Expand Down Expand Up @@ -236,7 +256,17 @@ impl<S: SleepStrategy> Sleep<S> {
/// Update the sleep strategy state, and sleep for the given backoff.
async fn sleep_update(&self, state: &mut S::State) {
self.strategy.next_state(state);
sleep(self.strategy.backoff(state)).await;
self.sleep_with_current_backoff(state).await;
}

/// Sleep for a [`Duration`] equivalent to the value of
/// the current backoff.
pub fn sleep_with_current_backoff(
&self,
state: &S::State,
) -> impl Future<Output = ()> + 'static {
let backoff_duration = self.strategy.backoff(state);
sleep(backoff_duration)
}

/// Run a future as many times as `iter_times`
Expand Down
1 change: 1 addition & 0 deletions crates/node/src/bench_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ impl BenchShieldedCtx {
wait_for_last_query_height: false,
max_concurrent_fetches: 100,
retry_strategy: RetryStrategy::Forever,
block_batch_size: 10,
},
&StdIo,
))
Expand Down
2 changes: 2 additions & 0 deletions crates/sdk/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2158,6 +2158,8 @@ pub struct ShieldedSync<C: NamadaTypes = SdkTypes> {
/// Maximum number of fetch jobs that will ever
/// execute concurrently during the shielded sync.
pub max_concurrent_fetches: usize,
/// Number of blocks fetched per concurrent fetch job.
pub block_batch_size: usize,
/// Maximum number of times to retry fetching. If `None`
/// is provided, defaults to "forever".
pub retry_strategy: RetryStrategy,
Expand Down
73 changes: 58 additions & 15 deletions crates/sdk/src/masp/utilities.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! Helper functions and types

use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use borsh::BorshDeserialize;
use masp_primitives::merkle_tree::{CommitmentTree, IncrementalWitness};
use masp_primitives::sapling::Node;
use masp_primitives::transaction::Transaction as MaspTx;
use namada_core::chain::BlockHeight;
use namada_core::collections::HashMap;
use namada_core::control_flow::time::{
Duration, LinearBackoff, Sleep, SleepStrategy,
};
use namada_core::storage::TxIndex;
use namada_events::extend::IndexedMaspData;
use namada_io::Client;
Expand All @@ -24,6 +27,8 @@ use crate::masp::{extract_masp_tx, get_indexed_masp_events_at_height};
struct LedgerMaspClientInner<C> {
client: C,
semaphore: Semaphore,
backoff: RwLock<Duration>,
sleep: Sleep<LinearBackoff>,
}

/// An inefficient MASP client which simply uses a
Expand All @@ -43,36 +48,39 @@ impl<C> Clone for LedgerMaspClient<C> {
impl<C> LedgerMaspClient<C> {
/// Create a new [`MaspClient`] given an rpc client.
#[inline(always)]
pub fn new(client: C, max_concurrent_fetches: usize) -> Self {
pub fn new(
client: C,
max_concurrent_fetches: usize,
linear_backoff_delta: Duration,
) -> Self {
Self {
inner: Arc::new(LedgerMaspClientInner {
client,
semaphore: Semaphore::new(max_concurrent_fetches),
backoff: RwLock::new(Duration::from_secs(0)),
sleep: Sleep {
strategy: LinearBackoff {
delta: linear_backoff_delta,
},
},
}),
}
}
}

impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
impl<C: Client + Send + Sync> LedgerMaspClient<C> {
async fn fetch_shielded_transfers_inner(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Error> {
let _permit = self.inner.semaphore.acquire().await.unwrap();

// Fetch all the transactions we do not have yet
let mut txs = vec![];

for height in from.0..=to.0 {
let maybe_txs_results = async {
let _permit = self.inner.semaphore.acquire().await.unwrap();

get_indexed_masp_events_at_height(
&self.inner.client,
height.into(),
Expand All @@ -86,8 +94,6 @@ impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
};

let block = {
let _permit = self.inner.semaphore.acquire().await.unwrap();

// Query the actual block to get the txs bytes. If we only need
// one tx it might be slightly better to query
// the /tx endpoint to reduce the amount of data
Expand Down Expand Up @@ -127,6 +133,43 @@ impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {

Ok(txs)
}
}

impl<C: Client + Send + Sync> MaspClient for LedgerMaspClient<C> {
type Error = Error;

async fn last_block_height(&self) -> Result<Option<BlockHeight>, Error> {
let maybe_block = crate::rpc::query_block(&self.inner.client).await?;
Ok(maybe_block.map(|b| b.height))
}

async fn fetch_shielded_transfers(
&self,
from: BlockHeight,
to: BlockHeight,
) -> Result<Vec<IndexedNoteEntry>, Error> {
const ZERO: Duration = Duration::from_secs(0);
let current_backoff = { *self.inner.backoff.read().unwrap() };

if current_backoff > ZERO {
self.inner
.sleep
.sleep_with_current_backoff(&current_backoff)
.await;
}

let result = self.fetch_shielded_transfers_inner(from, to).await;

if result.is_err() {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.next_state(&mut *backoff);
} else if current_backoff > ZERO {
let mut backoff = self.inner.backoff.write().unwrap();
self.inner.sleep.strategy.prev_state(&mut *backoff);
}

result
}

#[inline(always)]
fn capabilities(&self) -> MaspClientCapabilities {
Expand Down
1 change: 0 additions & 1 deletion crates/shielded_token/src/masp/shielded_sync/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ where
}

if self.config.retry_strategy.may_retry() {
self.config.fetched_tracker.message(format!("{error}"));
true
} else {
// NB: store last encountered error
Expand Down
Loading