diff --git a/Cargo.lock b/Cargo.lock index 4be3e2622bc..700302fe7aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8153,6 +8153,7 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite 0.2.13", + "tokio", "tower", "tower-layer", "tower-service", diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 1e48901c967..701b502c744 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -202,6 +202,10 @@ pub struct Command { #[clap(long = "query-log-threshold-time", default_value = "2s", env)] pub query_log_threshold_time: humantime::Duration, + /// Timeout before drop the request. + #[clap(long = "api-request-timeout", default_value = "30m", env)] + pub api_request_timeout: humantime::Duration, + #[clap(flatten)] pub profiling: profiling::ProfilingArgs, } @@ -240,6 +244,7 @@ impl Command { min_connected_reserved_peers, time_until_synced, query_log_threshold_time, + api_request_timeout, profiling: _, } = self; @@ -297,6 +302,7 @@ impl Command { let config = Config { addr, + api_request_timeout: api_request_timeout.into(), max_database_cache_size, database_path, database_type, diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 10aab388a4c..01b7b7de5b3 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -57,7 +57,7 @@ tempfile = { workspace = true, optional = true } thiserror = "1.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true, features = ["sync"] } -tower-http = { version = "0.3", features = ["set-header", "trace"] } +tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] } tracing = { workspace = true } uuid = { version = "1.1", features = ["v4"] } diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index d270e424843..8edfcb42f35 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -1,8 +1,5 @@ use async_trait::async_trait; -use fuel_core_services::stream::{ - BoxFuture, - BoxStream, -}; +use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ iter::{ BoxedIter, @@ -172,7 +169,7 @@ pub trait TxPoolPort: Send + Sync { fn tx_update_subscribe( &self, tx_id: TxId, - ) -> BoxFuture<'_, BoxStream>; + ) -> anyhow::Result>; } #[async_trait] diff --git a/crates/fuel-core/src/graphql_api/service.rs b/crates/fuel-core/src/graphql_api/service.rs index 6900a7b6d80..2ecb2c1457f 100644 --- a/crates/fuel-core/src/graphql_api/service.rs +++ b/crates/fuel-core/src/graphql_api/service.rs @@ -68,6 +68,7 @@ use std::{ use tokio_stream::StreamExt; use tower_http::{ set_header::SetResponseHeaderLayer, + timeout::TimeoutLayer, trace::TraceLayer, }; @@ -155,7 +156,8 @@ impl RunnableTask for Task { } } -// Need a separate Data Object for each Query endpoint, cannot be avoided +// Need a seperate Data Object for each Query endpoint, cannot be avoided +#[allow(clippy::too_many_arguments)] pub fn new_service( config: Config, schema: CoreSchemaBuilder, @@ -163,7 +165,8 @@ pub fn new_service( txpool: TxPool, producer: BlockProducer, consensus_module: ConsensusModule, - _log_threshold_ms: Duration, + log_threshold_ms: Duration, + request_timeout: Duration, ) -> anyhow::Result { let network_addr = config.addr; @@ -174,7 +177,7 @@ pub fn new_service( .data(producer) .data(consensus_module) .extension(async_graphql::extensions::Tracing) - .extension(MetricsExtension::new(_log_threshold_ms)) + .extension(MetricsExtension::new(log_threshold_ms)) .finish(); let router = Router::new() @@ -188,6 +191,7 @@ pub fn new_service( .route("/health", get(health)) .layer(Extension(schema)) .layer(TraceLayer::new_for_http()) + .layer(TimeoutLayer::new(request_timeout)) .layer(SetResponseHeaderLayer::<_>::overriding( ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"), diff --git a/crates/fuel-core/src/query/subscriptions.rs b/crates/fuel-core/src/query/subscriptions.rs index 87dfbf300fa..e3be265aa31 100644 --- a/crates/fuel-core/src/query/subscriptions.rs +++ b/crates/fuel-core/src/query/subscriptions.rs @@ -30,7 +30,7 @@ where } #[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))] -pub(crate) async fn transaction_status_change<'a, State>( +pub(crate) fn transaction_status_change<'a, State>( state: State, stream: BoxStream<'a, TxStatusMessage>, transaction_id: Bytes32, diff --git a/crates/fuel-core/src/query/subscriptions/test.rs b/crates/fuel-core/src/query/subscriptions/test.rs index 939952416b0..135919fa7e9 100644 --- a/crates/fuel-core/src/query/subscriptions/test.rs +++ b/crates/fuel-core/src/query/subscriptions/test.rs @@ -246,7 +246,6 @@ fn test_tsc_inner( let stream = futures::stream::iter(stream).boxed(); super::transaction_status_change(mock_state, stream, txn_id(0)) - .await .collect::>() .await }) diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index b8a6b4d151e..248e72769f2 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -302,12 +302,13 @@ impl TxStatusSubscription { &self, ctx: &Context<'a>, #[graphql(desc = "The ID of the transaction")] id: TransactionId, - ) -> impl Stream> + 'a { + ) -> anyhow::Result> + 'a> + { let txpool = ctx.data_unchecked::(); let db = ctx.data_unchecked::(); - let rx = txpool.tx_update_subscribe(id.into()).await; + let rx = txpool.tx_update_subscribe(id.into())?; - transaction_status_change( + Ok(transaction_status_change( move |id| match db.tx_status(&id) { Ok(status) => Ok(Some(status)), Err(StorageError::NotFound(_, _)) => { @@ -322,8 +323,7 @@ impl TxStatusSubscription { rx, id.into(), ) - .await - .map_err(async_graphql::Error::from) + .map_err(async_graphql::Error::from)) } /// Submits transaction to the `TxPool` and await either confirmation or failure. @@ -338,7 +338,7 @@ impl TxStatusSubscription { let config = ctx.data_unchecked::(); let tx = FuelTx::from_bytes(&tx.0)?; let tx_id = tx.id(&config.consensus_parameters.chain_id); - let subscription = txpool.tx_update_subscribe(tx_id).await; + let subscription = txpool.tx_update_subscribe(tx_id)?; let _: Vec<_> = txpool .insert(vec![Arc::new(tx)]) diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index ffbd649fb10..e55caf2c8c1 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -19,10 +19,7 @@ use crate::{ service::adapters::TxPoolAdapter, }; use async_trait::async_trait; -use fuel_core_services::stream::{ - BoxFuture, - BoxStream, -}; +use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ iter::{ BoxedIter, @@ -233,8 +230,11 @@ impl TxPoolPort for TxPoolAdapter { self.service.insert(txs).await } - fn tx_update_subscribe(&self, id: TxId) -> BoxFuture> { - Box::pin(self.service.tx_update_subscribe(id)) + fn tx_update_subscribe( + &self, + id: TxId, + ) -> anyhow::Result> { + self.service.tx_update_subscribe(id) } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 3343232918d..f0cabfda032 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -35,6 +35,7 @@ pub use fuel_core_poa::Trigger; #[derive(Clone, Debug)] pub struct Config { pub addr: SocketAddr, + pub api_request_timeout: Duration, pub max_database_cache_size: usize, pub database_path: PathBuf, pub database_type: DbType, @@ -77,6 +78,7 @@ impl Config { Self { addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), + api_request_timeout: Duration::from_secs(60), // Set the cache for tests = 10MB max_database_cache_size: 10 * 1024 * 1024, database_path: Default::default(), diff --git a/crates/fuel-core/src/service/query.rs b/crates/fuel-core/src/service/query.rs index 5586693e70c..b53d2db0fea 100644 --- a/crates/fuel-core/src/service/query.rs +++ b/crates/fuel-core/src/service/query.rs @@ -1,5 +1,4 @@ //! Queries we can run directly on `FuelService`. - use std::sync::Arc; use fuel_core_types::{ @@ -44,7 +43,7 @@ impl FuelService { tx: Transaction, ) -> anyhow::Result>> { let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id); - let stream = self.transaction_status_change(id).await; + let stream = self.transaction_status_change(id)?; self.submit(tx).await?; Ok(stream) } @@ -55,7 +54,7 @@ impl FuelService { tx: Transaction, ) -> anyhow::Result { let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id); - let stream = self.transaction_status_change(id).await.filter(|status| { + let stream = self.transaction_status_change(id)?.filter(|status| { futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_)))) }); futures::pin_mut!(stream); @@ -67,21 +66,20 @@ impl FuelService { } /// Return a stream of status changes for a transaction. - pub async fn transaction_status_change( + pub fn transaction_status_change( &self, id: Bytes32, - ) -> impl Stream> { + ) -> anyhow::Result>> { let txpool = self.shared.txpool.clone(); let db = self.shared.database.clone(); - let rx = Box::pin(txpool.tx_update_subscribe(id).await); - transaction_status_change( + let rx = txpool.tx_update_subscribe(id)?; + Ok(transaction_status_change( move |id| match db.get_tx_status(&id)? { Some(status) => Ok(Some(status)), None => Ok(txpool.find_one(id).map(Into::into)), }, rx, id, - ) - .await + )) } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index f5fb716f51a..56238449416 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -208,6 +208,7 @@ pub fn init_sub_services( Box::new(producer_adapter), Box::new(poa_adapter), config.query_log_threshold_time, + config.api_request_timeout, )?; let shared = SharedState { diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index fdb7f4d0dca..d93ec166e04 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -51,8 +51,12 @@ use fuel_core_types::{ tai64::Tai64, }; +use anyhow::anyhow; use parking_lot::Mutex as ParkingMutex; -use std::sync::Arc; +use std::{ + sync::Arc, + time::Duration, +}; use tokio::{ sync::broadcast, time::MissedTickBehavior, @@ -76,9 +80,9 @@ pub struct TxStatusChange { } impl TxStatusChange { - pub fn new(capacity: usize) -> Self { + pub fn new(capacity: usize, ttl: Duration) -> Self { let (new_tx_notification_sender, _) = broadcast::channel(capacity); - let update_sender = UpdateSender::new(capacity); + let update_sender = UpdateSender::new(capacity, ttl); Self { new_tx_notification_sender, update_sender, @@ -326,11 +330,11 @@ where self.tx_status_sender.new_tx_notification_sender.subscribe() } - pub async fn tx_update_subscribe(&self, tx_id: Bytes32) -> TxStatusStream { + pub fn tx_update_subscribe(&self, tx_id: Bytes32) -> anyhow::Result { self.tx_status_sender .update_sender - .subscribe::(tx_id) - .await + .try_subscribe::(tx_id) + .ok_or(anyhow!("Maximum number of subscriptions reached")) } } @@ -450,7 +454,14 @@ where gossiped_tx_stream, committed_block_stream, shared: SharedState { - tx_status_sender: TxStatusChange::new(number_of_active_subscription), + tx_status_sender: TxStatusChange::new( + number_of_active_subscription, + // The connection should be closed automatically after the `SqueezedOut` event. + // But because of slow/malicious consumers, the subscriber can still be occupied. + // We allow the subscriber to receive the event produced by TxPool's TTL. + // But we still want to drop subscribers after `2 * TxPool_TTL`. + 2 * config.transaction_ttl, + ), txpool, p2p, consensus_params, diff --git a/crates/services/txpool/src/service/tests.rs b/crates/services/txpool/src/service/tests.rs index e5d047d4425..85da7a46c70 100644 --- a/crates/services/txpool/src/service/tests.rs +++ b/crates/services/txpool/src/service/tests.rs @@ -198,11 +198,11 @@ async fn simple_insert_removal_subscription() { let mut tx1_subscribe_updates = service .shared .tx_update_subscribe(tx1.cached_id().unwrap()) - .await; + .unwrap(); let mut tx2_subscribe_updates = service .shared .tx_update_subscribe(tx2.cached_id().unwrap()) - .await; + .unwrap(); let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await; diff --git a/crates/services/txpool/src/service/tests_p2p.rs b/crates/services/txpool/src/service/tests_p2p.rs index 43e82631a2f..3a21e5156f1 100644 --- a/crates/services/txpool/src/service/tests_p2p.rs +++ b/crates/services/txpool/src/service/tests_p2p.rs @@ -26,7 +26,7 @@ async fn can_insert_from_p2p() { let mut receiver = service .shared .tx_update_subscribe(tx1.id(&Default::default())) - .await; + .unwrap(); service.start_and_await().await.unwrap(); @@ -66,7 +66,7 @@ async fn insert_from_local_broadcasts_to_p2p() { let mut subscribe_update = service .shared .tx_update_subscribe(tx1.cached_id().unwrap()) - .await; + .unwrap(); let out = service.shared.insert(vec![Arc::new(tx1.clone())]).await; @@ -115,7 +115,7 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { let mut receiver = service .shared .tx_update_subscribe(tx1.id(&Default::default())) - .await; + .unwrap(); service.start_and_await().await.unwrap(); diff --git a/crates/services/txpool/src/service/update_sender.rs b/crates/services/txpool/src/service/update_sender.rs index 349e34b8fcb..588df172772 100644 --- a/crates/services/txpool/src/service/update_sender.rs +++ b/crates/services/txpool/src/service/update_sender.rs @@ -2,17 +2,21 @@ use std::{ collections::HashMap, future::Future, pin::Pin, + time::Duration, }; use super::*; use parking_lot::Mutex; -use tokio::sync::{ - mpsc::{ - self, - error::TrySendError, +use tokio::{ + sync::{ + mpsc::{ + self, + error::TrySendError, + }, + OwnedSemaphorePermit, + Semaphore, }, - OwnedSemaphorePermit, - Semaphore, + time::Instant, }; use tokio_stream::{ wrappers::ReceiverStream, @@ -40,6 +44,8 @@ pub struct UpdateSender { senders: Arc>>, /// Semaphore used to limit the number of concurrent subscribers. permits: GetPermit, + /// TTL for senders + ttl: Duration, } /// Error returned when a transaction status update cannot be sent. @@ -78,6 +84,8 @@ struct Sender

> { stream: TxUpdateStream, /// The sending end of the subscriber channel. tx: Tx, + /// time that this sender was created + created: Instant, } /// A trait for sending transaction status updates. @@ -205,21 +213,21 @@ impl SendStatus for mpsc::Sender { impl UpdateSender { /// Create a new UpdateSender with a specified capacity for the semaphore - pub fn new(capacity: usize) -> UpdateSender { + pub fn new(capacity: usize, ttl: Duration) -> UpdateSender { UpdateSender { senders: Default::default(), permits: Arc::new(Semaphore::new(capacity)), + ttl, } } /// Try to subscribe for updates, returns a TxStatusStream if successful - #[cfg(test)] pub fn try_subscribe(&self, tx_id: Bytes32) -> Option where C: CreateChannel, { // Remove closed senders from the list - remove_closed(&mut self.senders.lock()); + remove_closed_and_expired(&mut self.senders.lock(), self.ttl); // Try to acquire a permit from the semaphore let permit = Arc::clone(&self.permits).try_acquire()?; @@ -228,25 +236,6 @@ impl UpdateSender { Some(self.subscribe_inner::(tx_id, permit)) } - /// Asynchronously await a permit to subscribe - /// returns a TxStatusStream on completion - pub async fn subscribe(&self, tx_id: Bytes32) -> TxStatusStream - where - C: CreateChannel, - { - // Remove closed senders from the list. - // Careful not to hold the lock while awaiting. - { - remove_closed(&mut self.senders.lock()); - } - - // Acquire a permit from the semaphore asynchronously. - let permit = Arc::clone(&self.permits).acquire().await; - - // Call subscribe_inner with the acquired permit. - self.subscribe_inner::(tx_id, permit) - } - /// Subscribe to updates with the given transaction id and a permit. fn subscribe_inner(&self, tx_id: Bytes32, permit: Permit) -> TxStatusStream where @@ -256,7 +245,7 @@ impl UpdateSender { let mut senders = self.senders.lock(); // Remove closed senders from the list - remove_closed(&mut senders); + remove_closed_and_expired(&mut senders, self.ttl); // Call the subscribe function with the tx_id, senders, and permit subscribe::<_, C>(tx_id, &mut (*senders), permit) @@ -268,7 +257,7 @@ impl UpdateSender { let mut senders = self.senders.lock(); // Remove closed senders from the list - remove_closed(&mut senders); + remove_closed_and_expired(&mut senders, self.ttl); // Initialize a flag to check if there are no senders // left for a given tx_id. @@ -308,6 +297,7 @@ where _permit: permit, stream: TxUpdateStream::new(), tx, + created: Instant::now(), }); // Return the receiver part of the channel @@ -315,13 +305,13 @@ where } // Remove closed senders from the senders map -fn remove_closed(senders: &mut SenderMap) +fn remove_closed_and_expired(senders: &mut SenderMap, ttl: Duration) where Tx: SendStatus, { // Iterate over the senders map, retaining only the senders that are not closed senders.retain(|_, senders| { - senders.retain(|sender| !sender.is_closed()); + senders.retain(|sender| !sender.is_closed() && sender.created.elapsed() < ttl); // Continue retaining if the senders list is not empty !senders.is_empty() }); @@ -349,6 +339,7 @@ impl Clone for UpdateSender { Self { senders: self.senders.clone(), permits: self.permits.clone(), + ttl: self.ttl, } } } diff --git a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs index f178f11d0f7..b3871b06e86 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs @@ -74,7 +74,7 @@ fn test_update_sender_inner(ops: Vec) { } // Initialize test variables - let update = UpdateSender::new(CAPACITY); + let update = UpdateSender::new(CAPACITY, Duration::from_secs(5)); let mut receivers: Vec = Vec::new(); let mut model_receivers: Vec<(u8, usize, [Option; 2])> = Vec::new(); let mut sender_id = 0usize; diff --git a/crates/services/txpool/src/service/update_sender/tests/test_permits.rs b/crates/services/txpool/src/service/update_sender/tests/test_permits.rs index 491cf4ee8c8..e819a12083e 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_permits.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_permits.rs @@ -68,6 +68,7 @@ fn test_try_subscribe_inner(senders: HashMap>(), )