Skip to content

Commit

Permalink
Switch from async-lock to tokio primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Sep 14, 2024
1 parent 492362a commit fb6de0a
Show file tree
Hide file tree
Showing 25 changed files with 53 additions and 62 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ include = [
bench = false

[dependencies]
async-lock = "3.3.0"
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bitvec = "1.0.1"
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::sector::{
};
use crate::segment_reconstruction::recover_missing_piece;
use crate::{FarmerProtocolInfo, PieceGetter};
use async_lock::Mutex as AsyncMutex;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
Expand All @@ -34,7 +33,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
use thiserror::Error;
use tokio::sync::{AcquireError, Semaphore};
use tokio::sync::{AcquireError, Mutex as AsyncMutex, Semaphore};
use tracing::{debug, trace, warn};

const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
Expand Down Expand Up @@ -397,7 +396,7 @@ where

loop {
// Take mutex briefly to make sure encoding is allowed right now
global_mutex.lock_blocking();
let _ = global_mutex.blocking_lock();

// This instead of `while` above because otherwise mutex will be held
// for the duration of the loop and will limit concurrency to 1 record
Expand Down
3 changes: 1 addition & 2 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ required-features = ["binary"]

[dependencies]
anyhow = "1.0.86"
async-lock = "3.3.0"
async-nats = { version = "0.35.1", optional = true }
async-trait = "0.1.81"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down Expand Up @@ -65,7 +64,7 @@ supports-color = { version = "3.0.0", optional = true }
tempfile = "3.12.0"
thiserror = "1.0.63"
thread-priority = "1.1.0"
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.39.2", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use backoff::ExponentialBackoff;
use clap::{Parser, ValueHint};
use futures::stream::FuturesUnordered;
Expand All @@ -31,6 +30,7 @@ use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::identity::Identity;
use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
use subspace_networking::utils::piece_provider::PieceProvider;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::info;

/// Get piece retry attempts number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::stream::FuturesUnordered;
Expand All @@ -25,6 +24,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -178,7 +178,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -235,7 +235,7 @@ pub(super) async fn maintain_farms(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -322,7 +322,7 @@ fn process_farm_identify_message<'a>(
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
plotted_pieces.blocking_write().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
Expand Down Expand Up @@ -434,7 +434,7 @@ async fn initialize_farm(
drop(sector_update_handler);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();
let mut plotted_pieces = plotted_pieces.blocking_write();

for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
Expand Down Expand Up @@ -35,7 +34,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::{Barrier, Mutex as AsyncMutex, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use clap::Parser;
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
Expand All @@ -27,7 +26,7 @@ use subspace_farmer::utils::{
};
use subspace_farmer_components::PieceGetter;
use subspace_proof_of_space::Table;
use tokio::sync::Semaphore;
use tokio::sync::{Mutex as AsyncMutex, Semaphore};
use tracing::info;

const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
Expand Down Expand Up @@ -51,7 +50,7 @@ use subspace_farmer_components::PieceGetter;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::sync::{Barrier, Semaphore};
use tokio::sync::{Barrier, Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

/// Get piece retry attempts number.
Expand Down Expand Up @@ -713,7 +712,7 @@ where
{
let _span_guard = span.enter();

let mut plotted_pieces = plotted_pieces.write_blocking();
let mut plotted_pieces = plotted_pieces.blocking_write();

if let Some(old_plotted_sector) = &old_plotted_sector {
plotted_pieces.delete_sector(farm_index, old_plotted_sector);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_lock::RwLock as AsyncRwLock;
use clap::Parser;
use prometheus_client::registry::Registry;
use std::collections::HashSet;
Expand All @@ -22,6 +21,7 @@ use subspace_networking::{
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, error, info, Instrument};

/// How many segment headers can be requested at a time.
Expand Down Expand Up @@ -139,7 +139,8 @@ where

let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
.try_read()?
.try_read()
.ok()?
.read_piece(piece_index)?
.in_current_span(),
None => {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::{Error as NodeClientError, NodeClient};
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
Expand All @@ -29,6 +28,7 @@ use subspace_farmer_components::PieceGetter;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;
use tracing::{debug, trace, warn};

/// Broadcast sent by controllers requesting farmers to identify themselves
Expand Down
9 changes: 5 additions & 4 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::farmer_cache::piece_cache_state::PieceCachesState;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
Expand All @@ -32,7 +31,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -1208,7 +1207,8 @@ where
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
.piece_caches
.try_read()?
.try_read()
.ok()?
.contains_stored_piece(&distance_key)
{
// Note: We store our own provider records locally without local addresses
Expand All @@ -1225,7 +1225,8 @@ where
let found_fut = self
.plot_caches
.caches
.try_read()?
.try_read()
.ok()?
.iter()
.map(|plot_cache| {
let plot_cache = Arc::clone(plot_cache);
Expand Down
13 changes: 7 additions & 6 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{
Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -22,6 +19,9 @@ use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::PieceGetter;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use tokio::sync::{
Mutex as AsyncMutex, OwnedMutexGuard as AsyncOwnedMutexGuard, RwLock as AsyncRwLock, Semaphore,
};
use tracing::{debug, error, trace};

pub mod piece_validator;
Expand All @@ -30,7 +30,7 @@ const MAX_RANDOM_WALK_ROUNDS: usize = 15;

struct InProgressPieceGetting<'a> {
piece_index: PieceIndex,
in_progress_piece: AsyncMutexGuardArc<Option<Piece>>,
in_progress_piece: AsyncOwnedMutexGuard<Option<Piece>>,
in_progress_pieces: &'a Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
}

Expand Down Expand Up @@ -63,8 +63,8 @@ impl<'a> InProgressPiece<'a> {
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(
in_progress_piece_mutex
.try_lock_arc()
Arc::clone(&in_progress_piece_mutex)
.try_lock_owned()
.expect("Just created; qed"),
);
let in_progress_piece_mutex = in_progress_pieces
Expand Down Expand Up @@ -282,6 +282,7 @@ where
let maybe_read_piece_fut = inner
.plotted_pieces
.try_read()
.ok()
.and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));

if let Some(read_piece_fut) = maybe_read_piece_fut {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use crate::utils::AsyncJoinOnDrop;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
use std::pin::Pin;
Expand All @@ -14,7 +13,7 @@ use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tokio::sync::watch;
use tokio::sync::{watch, Mutex as AsyncMutex, RwLock as AsyncRwLock};
use tokio_stream::wrappers::WatchStream;
use tracing::{info, trace, warn};

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/node_client/rpc_node_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Node client implementation that connects to node via RPC (WebSockets)

use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt};
use async_lock::Semaphore;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
Expand All @@ -13,6 +12,7 @@ use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;

/// TODO: Node is having a hard time responding for many piece requests, specifically this results
/// in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::plotter::cpu::metrics::CpuPlotterMetrics;
use crate::plotter::{Plotter, SectorPlottingProgress};
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
Expand All @@ -32,7 +31,7 @@ use subspace_farmer_components::plotting::{
};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_proof_of_space::Table;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{Mutex as AsyncMutex, OwnedSemaphorePermit, Semaphore};
use tokio::task::yield_now;
use tracing::{warn, Instrument};

Expand Down Expand Up @@ -302,7 +301,7 @@ where
}

// Take mutex briefly to make sure plotting is allowed right now
global_mutex.lock().await;
let _ = global_mutex.lock().await;

let downloading_start = Instant::now();

Expand Down
Loading

0 comments on commit fb6de0a

Please sign in to comment.