Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prefer fetching small PoVs from backing group #7173

Merged
merged 7 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,25 @@ fn process_message(
let _ =
tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?);
},
AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => {
let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?;

let validator_index = meta.map_or(None, |meta| meta.chunks_stored.first_one());

let maybe_chunk_size = if let Some(validator_index) = validator_index {
load_chunk(
&subsystem.db,
&subsystem.config,
&candidate,
ValidatorIndex(validator_index as u32),
)?
.map(|erasure_chunk| erasure_chunk.chunk.len())
} else {
None
};

let _ = tx.send(maybe_chunk_size);
},
AvailabilityStoreMessage::QueryAllChunks(candidate, tx) => {
match load_meta(&subsystem.db, &subsystem.config, &candidate)? {
None => {
Expand Down
48 changes: 48 additions & 0 deletions node/core/av-store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,3 +1153,51 @@ async fn import_leaf(

new_leaf
}

#[test]
fn query_chunk_size_works() {
let store = test_store();

test_harness(TestState::default(), store.clone(), |mut virtual_overseer| async move {
let candidate_hash = CandidateHash(Hash::repeat_byte(33));
let validator_index = ValidatorIndex(5);
let n_validators = 10;

let chunk = ErasureChunk {
chunk: vec![1, 2, 3],
index: validator_index,
proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(),
};

// Ensure an entry already exists. In reality this would come from watching
// chain events.
with_tx(&store, |tx| {
super::write_meta(
tx,
&TEST_CONFIG,
&candidate_hash,
&CandidateMeta {
data_available: false,
chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators],
state: State::Unavailable(BETimestamp(0)),
},
);
});

let (tx, rx) = oneshot::channel();

let chunk_msg =
AvailabilityStoreMessage::StoreChunk { candidate_hash, chunk: chunk.clone(), tx };

overseer_send(&mut virtual_overseer, chunk_msg).await;
assert_eq!(rx.await.unwrap(), Ok(()));

let (tx, rx) = oneshot::channel();
let query_chunk_size = AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx);

overseer_send(&mut virtual_overseer, query_chunk_size).await;

assert_eq!(rx.await.unwrap().unwrap(), chunk.chunk.len());
virtual_overseer
});
}
97 changes: 90 additions & 7 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,20 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);

/// PoV size limit in bytes for which prefer fetching from backers.
const SMALL_POV_LIMIT: usize = 128 * 1024;

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,
sandreim marked this conversation as resolved.
Show resolved Hide resolved

/// If true, we first try backers.
fast_path: bool,
/// Large PoV threshold. Only used when `fast_path` is `false.
small_pov_limit: Option<usize>,
sandreim marked this conversation as resolved.
Show resolved Hide resolved
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
Expand Down Expand Up @@ -863,10 +868,11 @@ async fn launch_recovery_task<Context>(
ctx: &mut Context,
session_info: SessionInfo,
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
mut backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
small_pov_limit: Option<usize>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

Expand All @@ -880,6 +886,29 @@ async fn launch_recovery_task<Context>(
bypass_availability_store,
};

if let Some(small_pov_limit) = small_pov_limit {
// Get our own chunk size to get an estimate of the PoV size.
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len() / 3);
sandreim marked this conversation as resolved.
Show resolved Hide resolved
let prefer_backing_group = pov_size_estimate < small_pov_limit;

gum::debug!(
target: LOG_TARGET,
pov_size_estimate,
small_pov_limit,
enabled = prefer_backing_group,
"Prefer fetch from backing group",
);

backing_group = backing_group.filter(|_| {
// We keep the backing group only if `1/3` of chunks sum up to less than `small_pov_limit`.
prefer_backing_group
});
}
}

let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g))
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
Expand Down Expand Up @@ -919,6 +948,7 @@ async fn handle_recover<Context>(
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
small_pov_limit: Option<usize>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

Expand Down Expand Up @@ -963,6 +993,7 @@ async fn handle_recover<Context>(
response_sender,
bypass_availability_store,
metrics,
small_pov_limit,
)
.await,
None => {
Expand All @@ -988,6 +1019,18 @@ async fn query_full_data<Context>(
rx.await.map_err(error::Error::CanceledQueryFullData)
}

/// Queries a chunk from av-store.
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn query_chunk_size<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> error::Result<Option<usize>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
.await;

rx.await.map_err(error::Error::CanceledQueryFullData)
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
Expand All @@ -996,7 +1039,13 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
Self {
fast_path: false,
small_pov_limit: None,
bypass_availability_store: true,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
Expand All @@ -1005,20 +1054,53 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
Self {
fast_path: true,
small_pov_limit: None,
bypass_availability_store: false,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
Self {
fast_path: false,
small_pov_limit: None,
bypass_availability_store: false,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
/// above a threshold.
pub fn with_chunks_if_pov_large(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self {
fast_path: false,
small_pov_limit: Some(SMALL_POV_LIMIT),
bypass_availability_store: false,
req_receiver,
metrics,
}
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;
let Self {
fast_path,
small_pov_limit,
mut req_receiver,
metrics,
bypass_availability_store,
} = self;

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
Expand All @@ -1045,10 +1127,11 @@ impl AvailabilityRecoverySubsystem {
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| fast_path),
maybe_backing_group.filter(|_| fast_path || small_pov_limit.is_some()),
response_sender,
bypass_availability_store,
&metrics,
small_pov_limit,
).await {
gum::warn!(
target: LOG_TARGET,
Expand Down
Loading