Skip to content

Commit

Permalink
Merge pull request #1764 from subspace/improve-plotting-performance
Browse files Browse the repository at this point in the history
Improve plotting performance
  • Loading branch information
nazar-pc authored Aug 7, 2023
2 parents 6207815 + 59a77cc commit 5016f44
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 70 deletions.
29 changes: 29 additions & 0 deletions crates/subspace-core-primitives/src/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,35 @@ impl Record {
unsafe { Box::new_zeroed().assume_init() }
}

/// Create vector filled with zeroe records without hitting stack overflow
#[inline]
pub fn new_zero_vec(length: usize) -> Vec<Self> {
// TODO: Should have been just `::new()`, but https://github.com/rust-lang/rust/issues/53827
let mut records = Vec::with_capacity(length);
{
let slice = records.spare_capacity_mut();
// SAFETY: Same memory layout due to `#[repr(transparent)]` on `Record` and
// `MaybeUninit<[[T; M]; N]>` is guaranteed to have the same layout as
// `[[MaybeUninit<T>; M]; N]`
let slice = unsafe {
slice::from_raw_parts_mut(
slice.as_mut_ptr()
as *mut [[mem::MaybeUninit<u8>; Scalar::FULL_BYTES]; Self::NUM_CHUNKS],
length,
)
};
for byte in slice.flatten_mut().flatten_mut() {
byte.write(0);
}
}
// SAFETY: All values are initialized above.
unsafe {
records.set_len(records.capacity());
}

records
}

/// Convenient conversion from slice of record to underlying representation for efficiency
/// purposes.
#[inline]
Expand Down
145 changes: 85 additions & 60 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::FarmerProtocolInfo;
use async_trait::async_trait;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::Encode;
use parking_lot::Mutex;
Expand All @@ -25,7 +25,7 @@ use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Quality, Table, TableGenerator};
use thiserror::Error;
use tokio::sync::Semaphore;
use tracing::{debug, warn};
use tracing::{debug, trace, warn};

const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;

Expand Down Expand Up @@ -212,34 +212,45 @@ where

let raw_sector = Mutex::new(RawSector::new(pieces_in_sector));

retry(default_backoff(), || async {
let mut raw_sector = raw_sector.lock();
raw_sector.records.clear();
raw_sector.metadata.clear();

if let Err(error) = download_sector(
&mut raw_sector,
piece_getter,
piece_getter_retry_policy,
kzg,
&piece_indexes,
)
.await
{
warn!(
%sector_index,
%error,
"Sector plotting attempt failed, will retry later"
);

return Err(BackoffError::transient(error));
}
{
// This list will be mutated, replacing pieces we have already processed with `None`
let incremental_piece_indices =
Mutex::new(piece_indexes.iter().copied().map(Some).collect::<Vec<_>>());

retry(default_backoff(), || async {
let mut raw_sector = raw_sector.lock();
let mut incremental_piece_indices = incremental_piece_indices.lock();

if let Err(error) = download_sector(
&mut raw_sector,
piece_getter,
piece_getter_retry_policy,
kzg,
&mut incremental_piece_indices,
)
.await
{
let retrieved_pieces = incremental_piece_indices
.iter()
.filter(|maybe_piece_index| maybe_piece_index.is_some())
.count();
warn!(
%sector_index,
%error,
%pieces_in_sector,
%retrieved_pieces,
"Sector plotting attempt failed, will retry later"
);

return Err(BackoffError::transient(error));
}

debug!(%sector_index, "Sector downloaded successfully");
debug!(%sector_index, "Sector downloaded successfully");

Ok(())
})
.await?;
Ok(())
})
.await?;
}

let mut raw_sector = raw_sector.into_inner();

Expand All @@ -263,7 +274,7 @@ where
.map(|scalar_bytes| {
Scalar::try_from(scalar_bytes).expect(
"Piece getter must returns valid pieces of history that contain \
proper scalar bytes; qed",
proper scalar bytes; qed",
)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -401,18 +412,22 @@ async fn download_sector<PG: PieceGetter>(
piece_getter: &PG,
piece_getter_retry_policy: PieceGetterRetryPolicy,
kzg: &Kzg,
piece_indexes: &[PieceIndex],
piece_indexes: &mut [Option<PieceIndex>],
) -> Result<(), PlottingError> {
// TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
// concurrency from there
let recovery_semaphore = Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);

let mut pieces_receiving_futures = piece_indexes
.iter()
.map(|piece_index| async {
let piece_index = *piece_index;

let piece_result = piece_getter
.iter_mut()
.zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
.map(|(maybe_piece_index, (record, metadata))| async {
// We skip pieces that we have already processed previously
let Some(piece_index) = *maybe_piece_index else {
return Ok(());
};

let mut piece_result = piece_getter
.get_piece(piece_index, piece_getter_retry_policy)
.await;

Expand All @@ -421,43 +436,53 @@ async fn download_sector<PG: PieceGetter>(
.map(|piece| piece.is_some())
.unwrap_or_default();

// all retries failed
// All retries failed
if !succeeded {
let _permit = match recovery_semaphore.acquire().await {
Ok(permit) => permit,
Err(error) => {
return (
piece_index,
Err(format!("Recovery semaphore was closed: {error}").into()),
);
let error = format!("Recovery semaphore was closed: {error}").into();
return Err(PlottingError::FailedToRetrievePiece { piece_index, error });
}
};
let recovered_piece =
recover_missing_piece(piece_getter, kzg.clone(), piece_index).await;

return (piece_index, recovered_piece.map(Some).map_err(Into::into));
piece_result = recovered_piece.map(Some).map_err(Into::into);
}

(piece_index, piece_result)
let piece = piece_result
.map_err(|error| PlottingError::FailedToRetrievePiece { piece_index, error })?
.ok_or(PlottingError::PieceNotFound { piece_index })?;

// Fancy way to insert value in order to avoid going through stack (if naive de-referencing
// is used) and potentially causing stack overflow as the result
record
.flatten_mut()
.copy_from_slice(piece.record().flatten());
*metadata = RecordMetadata {
commitment: *piece.commitment(),
witness: *piece.witness(),
};

// We have processed this piece index, clear it
maybe_piece_index.take();

Ok(())
})
.collect::<FuturesOrdered<_>>();

while let Some((piece_index, piece_result)) = pieces_receiving_futures.next().await {
let piece = piece_result
.map_err(|error| PlottingError::FailedToRetrievePiece { piece_index, error })?
.ok_or(PlottingError::PieceNotFound { piece_index })?;

let (record, commitment, witness) = piece.split();
// Fancy way to insert value in order to avoid going through stack (if naive de-referencing
// is used) and potentially causing stack overflow as the result
raw_sector
.records
.extend_from_slice(std::slice::from_ref(record));
raw_sector.metadata.push(RecordMetadata {
commitment: *commitment,
witness: *witness,
});
.collect::<FuturesUnordered<_>>();

let mut final_result = Ok(());

while let Some(result) = pieces_receiving_futures.next().await {
if let Err(error) = result {
trace!(%error, "Failed to download piece");

if final_result.is_ok() {
final_result = Err(error);
}
}
}

Ok(())
final_result
}
9 changes: 4 additions & 5 deletions crates/subspace-farmer-components/src/sector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SectorMetadata {
}

/// Commitment and witness corresponding to the same record
#[derive(Debug, Clone, Encode, Decode)]
#[derive(Debug, Default, Clone, Encode, Decode)]
pub struct RecordMetadata {
/// Record commitment
pub commitment: RecordCommitment,
Expand All @@ -109,12 +109,11 @@ pub struct RawSector {
}

impl RawSector {
/// Create new raw sector with internal vectors being allocated (but not filled) to be able to
/// store data for specified number of pieces in sector
/// Create new raw sector with internal vectors being allocated and filled with default values
pub fn new(pieces_in_sector: u16) -> Self {
Self {
records: Vec::with_capacity(usize::from(pieces_in_sector)),
metadata: Vec::with_capacity(usize::from(pieces_in_sector)),
records: Record::new_zero_vec(usize::from(pieces_in_sector)),
metadata: vec![RecordMetadata::default(); usize::from(pieces_in_sector)],
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
base_path,
keypair,
dsn,
&readers_and_pieces,
Arc::downgrade(&readers_and_pieces),
node_client.clone(),
archival_storage_pieces.clone(),
archival_storage_info.clone(),
Expand Down Expand Up @@ -137,6 +137,7 @@ where
piece_provider,
piece_cache.clone(),
archival_storage_info,
Arc::clone(&readers_and_pieces),
));

let _piece_cache_worker = run_future_in_dedicated_thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::StreamExt;
use parking_lot::Mutex;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use subspace_core_primitives::SegmentIndex;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
Expand Down Expand Up @@ -47,7 +47,7 @@ pub(super) fn configure_dsn(
target_connections,
external_addresses,
}: DsnArgs,
readers_and_pieces: &Arc<Mutex<Option<ReadersAndPieces>>>,
weak_readers_and_pieces: Weak<Mutex<Option<ReadersAndPieces>>>,
node_client: NodeRpcClient,
archival_storage_pieces: ArchivalStoragePieces,
archival_storage_info: ArchivalStorageInfo,
Expand All @@ -61,8 +61,6 @@ pub(super) fn configure_dsn(
NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
};

let weak_readers_and_pieces = Arc::downgrade(readers_and_pieces);

let provider_db_path = base_path.join("providers_db");

info!(
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/single_disk_plot/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ where

if let Some(old_sector_metadata) = &maybe_old_sector_metadata {
if farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size {
debug!(
current_history_size = %farmer_app_info.protocol_info.history_size,
old_sector_history_size = %old_sector_metadata.history_size,
"Latest protocol history size is not yet newer than old sector history \
size, wait for a bit and try again"
);
tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
continue;
}
Expand Down
18 changes: 18 additions & 0 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::piece_cache::PieceCache;
use crate::utils::archival_storage_info::ArchivalStorageInfo;
use crate::utils::readers_and_pieces::ReadersAndPieces;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
Expand All @@ -16,6 +19,7 @@ pub struct FarmerPieceGetter<PV> {
piece_provider: PieceProvider<PV>,
piece_cache: PieceCache,
archival_storage_info: ArchivalStorageInfo,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
}

impl<PV> FarmerPieceGetter<PV> {
Expand All @@ -24,12 +28,14 @@ impl<PV> FarmerPieceGetter<PV> {
piece_provider: PieceProvider<PV>,
piece_cache: PieceCache,
archival_storage_info: ArchivalStorageInfo,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
) -> Self {
Self {
node,
piece_provider,
piece_cache,
archival_storage_info,
readers_and_pieces,
}
}

Expand Down Expand Up @@ -68,6 +74,18 @@ where
return Ok(maybe_piece);
}

let maybe_read_piece_fut = self
.readers_and_pieces
.lock()
.as_ref()
.and_then(|readers_and_pieces| readers_and_pieces.read_piece(&piece_index_hash));

if let Some(read_piece_fut) = maybe_read_piece_fut {
if let Some(piece) = read_piece_fut.await {
return Ok(Some(piece));
}
}

// L1 piece acquisition
// TODO: consider using retry policy for L1 lookups as well.
let connected_peers = HashSet::<PeerId>::from_iter(self.node.connected_peers().await?);
Expand Down

0 comments on commit 5016f44

Please sign in to comment.