Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add option to skip av-store requests in availability-recovery-subsyst…
Browse files Browse the repository at this point in the history
…em (#7131)

* Allow to skip availability-store

* Update node/network/availability-recovery/src/lib.rs

Co-authored-by: Michal Kucharczyk <[email protected]>

---------

Co-authored-by: Michal Kucharczyk <[email protected]>
  • Loading branch information
skunert and michalkucharczyk authored Apr 28, 2023
1 parent 3ec08c0 commit 1d8ccbf
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,

fast_path: bool,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
Expand Down Expand Up @@ -147,6 +153,9 @@ struct RecoveryParams {

/// Metrics to report
metrics: Metrics,

/// Do not request data from availability-store
bypass_availability_store: bool,
}

/// Source the availability data either by means
Expand Down Expand Up @@ -467,7 +476,7 @@ impl RequestChunksFromValidators {
let metrics = &params.metrics;

// First query the store for any chunks we've got.
{
if !params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
Expand Down Expand Up @@ -668,7 +677,7 @@ where
{
async fn run(mut self) -> Result<AvailableData, RecoveryError> {
// First just see if we have the data available locally.
{
if !self.params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
self.sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(
Expand Down Expand Up @@ -856,6 +865,7 @@ async fn launch_recovery_task<Context>(
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
Expand All @@ -867,6 +877,7 @@ async fn launch_recovery_task<Context>(
candidate_hash,
erasure_root: receipt.descriptor.erasure_root,
metrics: metrics.clone(),
bypass_availability_store,
};

let phase = backing_group
Expand Down Expand Up @@ -906,6 +917,7 @@ async fn handle_recover<Context>(
session_index: SessionIndex,
backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
) -> error::Result<()> {
let candidate_hash = receipt.hash();
Expand Down Expand Up @@ -949,6 +961,7 @@ async fn handle_recover<Context>(
receipt,
backing_group,
response_sender,
bypass_availability_store,
metrics,
)
.await,
Expand Down Expand Up @@ -977,26 +990,35 @@ async fn query_full_data<Context>(

#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
/// `AvailabilityStoreSubsystem` subsystem.
pub fn with_availability_store_skip(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
/// request data from backers.
pub fn with_fast_path(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, req_receiver, metrics }
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, req_receiver, metrics }
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { fast_path, mut req_receiver, metrics } = self;
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
Expand Down Expand Up @@ -1025,6 +1047,7 @@ impl AvailabilityRecoverySubsystem {
session_index,
maybe_backing_group.filter(|_| fast_path),
response_sender,
bypass_availability_store,
&metrics,
).await {
gum::warn!(
Expand All @@ -1041,6 +1064,14 @@ impl AvailabilityRecoverySubsystem {
in_req = recv_req => {
match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? {
Ok(req) => {
if bypass_availability_store {
gum::debug!(
target: LOG_TARGET,
"Skipping request to availability-store.",
);
let _ = req.send_response(None.into());
continue
}
match query_full_data(&mut ctx, req.payload.candidate_hash).await {
Ok(res) => {
let _ = req.send_response(res.into());
Expand Down

0 comments on commit 1d8ccbf

Please sign in to comment.