Skip to content

Commit

Permalink
Tx subscription cleanup (#1422)
Browse files Browse the repository at this point in the history
fixes: #1421

---------

Co-authored-by: xgreenx <[email protected]>
  • Loading branch information
Voxelot and xgreenx committed Oct 19, 2023
1 parent 70557ab commit 8d60be9
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ pub struct Command {
#[clap(long = "query-log-threshold-time", default_value = "2s", env)]
pub query_log_threshold_time: humantime::Duration,

/// Timeout before drop the request.
#[clap(long = "api-request-timeout", default_value = "30m", env)]
pub api_request_timeout: humantime::Duration,

#[clap(flatten)]
pub profiling: profiling::ProfilingArgs,
}
Expand Down Expand Up @@ -240,6 +244,7 @@ impl Command {
min_connected_reserved_peers,
time_until_synced,
query_log_threshold_time,
api_request_timeout,
profiling: _,
} = self;

Expand Down Expand Up @@ -297,6 +302,7 @@ impl Command {

let config = Config {
addr,
api_request_timeout: api_request_timeout.into(),
max_database_cache_size,
database_path,
database_type,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ tempfile = { workspace = true, optional = true }
thiserror = "1.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }
tower-http = { version = "0.3", features = ["set-header", "trace"] }
tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] }
tracing = { workspace = true }
uuid = { version = "1.1", features = ["v4"] }

Expand Down
7 changes: 2 additions & 5 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -172,7 +169,7 @@ pub trait TxPoolPort: Send + Sync {
fn tx_update_subscribe(
&self,
tx_id: TxId,
) -> BoxFuture<'_, BoxStream<TxStatusMessage>>;
) -> anyhow::Result<BoxStream<TxStatusMessage>>;
}

#[async_trait]
Expand Down
10 changes: 7 additions & 3 deletions crates/fuel-core/src/graphql_api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use std::{
use tokio_stream::StreamExt;
use tower_http::{
set_header::SetResponseHeaderLayer,
timeout::TimeoutLayer,
trace::TraceLayer,
};

Expand Down Expand Up @@ -155,15 +156,17 @@ impl RunnableTask for Task {
}
}

// Need a separate Data Object for each Query endpoint, cannot be avoided
// Need a seperate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service(
config: Config,
schema: CoreSchemaBuilder,
database: Database,
txpool: TxPool,
producer: BlockProducer,
consensus_module: ConsensusModule,
_log_threshold_ms: Duration,
log_threshold_ms: Duration,
request_timeout: Duration,
) -> anyhow::Result<Service> {
let network_addr = config.addr;

Expand All @@ -174,7 +177,7 @@ pub fn new_service(
.data(producer)
.data(consensus_module)
.extension(async_graphql::extensions::Tracing)
.extension(MetricsExtension::new(_log_threshold_ms))
.extension(MetricsExtension::new(log_threshold_ms))
.finish();

let router = Router::new()
Expand All @@ -188,6 +191,7 @@ pub fn new_service(
.route("/health", get(health))
.layer(Extension(schema))
.layer(TraceLayer::new_for_http())
.layer(TimeoutLayer::new(request_timeout))
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_ORIGIN,
HeaderValue::from_static("*"),
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
}

#[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))]
pub(crate) async fn transaction_status_change<'a, State>(
pub(crate) fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, TxStatusMessage>,
transaction_id: Bytes32,
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/query/subscriptions/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ fn test_tsc_inner(

let stream = futures::stream::iter(stream).boxed();
super::transaction_status_change(mock_state, stream, txn_id(0))
.await
.collect::<Vec<_>>()
.await
})
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,13 @@ impl TxStatusSubscription {
&self,
ctx: &Context<'a>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a {
) -> anyhow::Result<impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a>
{
let txpool = ctx.data_unchecked::<TxPool>();
let db = ctx.data_unchecked::<Database>();
let rx = txpool.tx_update_subscribe(id.into()).await;
let rx = txpool.tx_update_subscribe(id.into())?;

transaction_status_change(
Ok(transaction_status_change(
move |id| match db.tx_status(&id) {
Ok(status) => Ok(Some(status)),
Err(StorageError::NotFound(_, _)) => {
Expand All @@ -322,8 +323,7 @@ impl TxStatusSubscription {
rx,
id.into(),
)
.await
.map_err(async_graphql::Error::from)
.map_err(async_graphql::Error::from))
}

/// Submits transaction to the `TxPool` and await either confirmation or failure.
Expand All @@ -338,7 +338,7 @@ impl TxStatusSubscription {
let config = ctx.data_unchecked::<Config>();
let tx = FuelTx::from_bytes(&tx.0)?;
let tx_id = tx.id(&config.consensus_parameters.chain_id);
let subscription = txpool.tx_update_subscribe(tx_id).await;
let subscription = txpool.tx_update_subscribe(tx_id)?;

let _: Vec<_> = txpool
.insert(vec![Arc::new(tx)])
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use crate::{
service::adapters::TxPoolAdapter,
};
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -233,8 +230,11 @@ impl TxPoolPort for TxPoolAdapter {
self.service.insert(txs).await
}

fn tx_update_subscribe(&self, id: TxId) -> BoxFuture<BoxStream<TxStatusMessage>> {
Box::pin(self.service.tx_update_subscribe(id))
fn tx_update_subscribe(
&self,
id: TxId,
) -> anyhow::Result<BoxStream<TxStatusMessage>> {
self.service.tx_update_subscribe(id)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use fuel_core_poa::Trigger;
#[derive(Clone, Debug)]
pub struct Config {
pub addr: SocketAddr,
pub api_request_timeout: Duration,
pub max_database_cache_size: usize,
pub database_path: PathBuf,
pub database_type: DbType,
Expand Down Expand Up @@ -77,6 +78,7 @@ impl Config {

Self {
addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
api_request_timeout: Duration::from_secs(60),
// Set the cache for tests = 10MB
max_database_cache_size: 10 * 1024 * 1024,
database_path: Default::default(),
Expand Down
16 changes: 7 additions & 9 deletions crates/fuel-core/src/service/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Queries we can run directly on `FuelService`.
use std::sync::Arc;

use fuel_core_types::{
Expand Down Expand Up @@ -44,7 +43,7 @@ impl FuelService {
tx: Transaction,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>>> {
let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id);
let stream = self.transaction_status_change(id).await;
let stream = self.transaction_status_change(id)?;
self.submit(tx).await?;
Ok(stream)
}
Expand All @@ -55,7 +54,7 @@ impl FuelService {
tx: Transaction,
) -> anyhow::Result<TransactionStatus> {
let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id);
let stream = self.transaction_status_change(id).await.filter(|status| {
let stream = self.transaction_status_change(id)?.filter(|status| {
futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_))))
});
futures::pin_mut!(stream);
Expand All @@ -67,21 +66,20 @@ impl FuelService {
}

/// Return a stream of status changes for a transaction.
pub async fn transaction_status_change(
pub fn transaction_status_change(
&self,
id: Bytes32,
) -> impl Stream<Item = anyhow::Result<TransactionStatus>> {
) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>>> {
let txpool = self.shared.txpool.clone();
let db = self.shared.database.clone();
let rx = Box::pin(txpool.tx_update_subscribe(id).await);
transaction_status_change(
let rx = txpool.tx_update_subscribe(id)?;
Ok(transaction_status_change(
move |id| match db.get_tx_status(&id)? {
Some(status) => Ok(Some(status)),
None => Ok(txpool.find_one(id).map(Into::into)),
},
rx,
id,
)
.await
))
}
}
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub fn init_sub_services(
Box::new(producer_adapter),
Box::new(poa_adapter),
config.query_log_threshold_time,
config.api_request_timeout,
)?;

let shared = SharedState {
Expand Down
25 changes: 18 additions & 7 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ use fuel_core_types::{
tai64::Tai64,
};

use anyhow::anyhow;
use parking_lot::Mutex as ParkingMutex;
use std::sync::Arc;
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
sync::broadcast,
time::MissedTickBehavior,
Expand All @@ -76,9 +80,9 @@ pub struct TxStatusChange {
}

impl TxStatusChange {
pub fn new(capacity: usize) -> Self {
pub fn new(capacity: usize, ttl: Duration) -> Self {
let (new_tx_notification_sender, _) = broadcast::channel(capacity);
let update_sender = UpdateSender::new(capacity);
let update_sender = UpdateSender::new(capacity, ttl);
Self {
new_tx_notification_sender,
update_sender,
Expand Down Expand Up @@ -326,11 +330,11 @@ where
self.tx_status_sender.new_tx_notification_sender.subscribe()
}

pub async fn tx_update_subscribe(&self, tx_id: Bytes32) -> TxStatusStream {
pub fn tx_update_subscribe(&self, tx_id: Bytes32) -> anyhow::Result<TxStatusStream> {
self.tx_status_sender
.update_sender
.subscribe::<MpscChannel>(tx_id)
.await
.try_subscribe::<MpscChannel>(tx_id)
.ok_or(anyhow!("Maximum number of subscriptions reached"))
}
}

Expand Down Expand Up @@ -450,7 +454,14 @@ where
gossiped_tx_stream,
committed_block_stream,
shared: SharedState {
tx_status_sender: TxStatusChange::new(number_of_active_subscription),
tx_status_sender: TxStatusChange::new(
number_of_active_subscription,
// The connection should be closed automatically after the `SqueezedOut` event.
// But because of slow/malicious consumers, the subscriber can still be occupied.
// We allow the subscriber to receive the event produced by TxPool's TTL.
// But we still want to drop subscribers after `2 * TxPool_TTL`.
2 * config.transaction_ttl,
),
txpool,
p2p,
consensus_params,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/txpool/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ async fn simple_insert_removal_subscription() {
let mut tx1_subscribe_updates = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();
let mut tx2_subscribe_updates = service
.shared
.tx_update_subscribe(tx2.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await;

Expand Down
6 changes: 3 additions & 3 deletions crates/services/txpool/src/service/tests_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn can_insert_from_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&Default::default()))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down Expand Up @@ -66,7 +66,7 @@ async fn insert_from_local_broadcasts_to_p2p() {
let mut subscribe_update = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![Arc::new(tx1.clone())]).await;

Expand Down Expand Up @@ -115,7 +115,7 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&Default::default()))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down
Loading

0 comments on commit 8d60be9

Please sign in to comment.