Skip to content

Commit

Permalink
Pruning of the transaction by TTL (#1033)
Browse files Browse the repository at this point in the history
Added basic pruning of the transaction by the TTL.

It is done periodically for constant period of time. We can do it
often(based on the remaining time to the next timeout), but I tried to
avoid to often locking.

---------

Co-authored-by: Voxelot <[email protected]>
  • Loading branch information
crypto523 and Voxelot committed Feb 21, 2023
1 parent bb31294 commit 05367d6
Show file tree
Hide file tree
Showing 20 changed files with 463 additions and 127 deletions.
12 changes: 11 additions & 1 deletion bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub struct Command {
pub max_da_lag: u64,
#[clap(long = "verify_max_relayer_wait", default_value = "30s", env)]
pub max_wait_time: humantime::Duration,

#[clap(long = "tx-pool-ttl", default_value = "5m", env)]
pub tx_pool_ttl: humantime::Duration,
}

impl Command {
Expand Down Expand Up @@ -177,6 +180,7 @@ impl Command {
metrics,
max_da_lag,
max_wait_time,
tx_pool_ttl,
} = self;

let addr = net::SocketAddr::new(ip, port);
Expand Down Expand Up @@ -242,7 +246,13 @@ impl Command {
vm: VMConfig {
backtrace: vm_backtrace,
},
txpool: TxPoolConfig::new(chain_conf, min_gas_price, utxo_validation),
txpool: TxPoolConfig::new(
chain_conf,
min_gas_price,
utxo_validation,
metrics,
tx_pool_ttl.into(),
),
block_producer: ProducerConfig {
utxo_validation,
coinbase_recipient,
Expand Down
5 changes: 3 additions & 2 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use fuel_core_types::{
txpool::{
InsertionResult,
TransactionStatus,
TxInfo,
},
},
tai64::Tai64,
Expand Down Expand Up @@ -148,7 +147,9 @@ pub trait DatabaseChain {
}

pub trait TxPoolPort: Send + Sync {
fn find_one(&self, id: TxId) -> Option<TxInfo>;
fn transaction(&self, id: TxId) -> Option<Transaction>;

fn submission_time(&self, id: TxId) -> Option<Tai64>;

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>>;

Expand Down
5 changes: 2 additions & 3 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use futures::{
use itertools::Itertools;
use std::{
iter,
ops::Deref,
sync::Arc,
};
use types::Transaction;
Expand All @@ -74,8 +73,8 @@ impl TxQuery {
let id = id.0;
let txpool = ctx.data_unchecked::<TxPool>();

if let Some(transaction) = txpool.find_one(id) {
Ok(Some(Transaction(transaction.tx().clone().deref().into())))
if let Some(transaction) = txpool.transaction(id) {
Ok(Some(Transaction(transaction)))
} else {
query.transaction(&id).into_api_result()
}
Expand Down
9 changes: 4 additions & 5 deletions crates/fuel-core/src/schema/tx/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,10 @@ pub(super) async fn get_tx_status(
.into_api_result::<txpool::TransactionStatus, StorageError>()?
{
Some(status) => Ok(Some(status.into())),
None => match txpool.find_one(id) {
Some(transaction_in_pool) => {
let time = transaction_in_pool.submitted_time();
Ok(Some(TransactionStatus::Submitted(SubmittedStatus(time))))
}
None => match txpool.submission_time(id) {
Some(submitted_time) => Ok(Some(TransactionStatus::Submitted(
SubmittedStatus(submitted_time),
))),
_ => Ok(None),
},
}
Expand Down
22 changes: 17 additions & 5 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ use fuel_core_types::{
TransactionStatus,
},
},
tai64::Tai64,
};
use std::{
ops::Deref,
sync::Arc,
};
use std::sync::Arc;
use tokio_stream::wrappers::{
errors::BroadcastStreamRecvError,
BroadcastStream,
Expand Down Expand Up @@ -202,6 +206,18 @@ impl DatabaseChain for Database {
impl DatabasePort for Database {}

impl TxPoolPort for TxPoolAdapter {
fn transaction(&self, id: TxId) -> Option<Transaction> {
self.service
.find_one(id)
.map(|info| info.tx().clone().deref().into())
}

fn submission_time(&self, id: TxId) -> Option<Tai64> {
self.service
.find_one(id)
.map(|info| Tai64::from_unix(info.submitted_time().as_secs() as i64))
}

fn insert(&self, txs: Vec<Arc<Transaction>>) -> Vec<anyhow::Result<InsertionResult>> {
self.service.insert(txs)
}
Expand All @@ -211,10 +227,6 @@ impl TxPoolPort for TxPoolAdapter {
) -> BoxStream<Result<TxUpdate, BroadcastStreamRecvError>> {
Box::pin(BroadcastStream::new(self.service.tx_update_subscribe()))
}

fn find_one(&self, id: TxId) -> Option<fuel_core_types::services::txpool::TxInfo> {
self.service.find_one(id)
}
}

#[async_trait]
Expand Down
9 changes: 6 additions & 3 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
SocketAddr,
},
path::PathBuf,
time::Duration,
};
use strum_macros::{
Display,
Expand Down Expand Up @@ -71,11 +72,13 @@ impl Config {
block_production: Trigger::Instant,
vm: Default::default(),
utxo_validation,
txpool: fuel_core_txpool::Config::new(
chain_conf,
txpool: fuel_core_txpool::Config {
chain_config: chain_conf,
min_gas_price,
utxo_validation,
),
transaction_ttl: Duration::from_secs(60 * 100000000),
..fuel_core_txpool::Config::default()
},
block_producer: Default::default(),
block_executor: Default::default(),
block_importer: Default::default(),
Expand Down
4 changes: 4 additions & 0 deletions crates/services/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ fuel-core-txpool = { path = "", features = ["test-helpers"] }
itertools = { workspace = true }
mockall = { workspace = true }
rstest = "0.15"
tokio = { workspace = true, features = [
"sync",
"test-util",
] }

[features]
test-helpers = [
Expand Down
19 changes: 17 additions & 2 deletions crates/services/txpool/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use fuel_core_chain_config::ChainConfig;
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct Config {
Expand All @@ -14,13 +15,24 @@ pub struct Config {
pub chain_config: ChainConfig,
/// Enables prometheus metrics for this fuel-service
pub metrics: bool,
/// Transaction TTL
pub transaction_ttl: Duration,
}

impl Default for Config {
fn default() -> Self {
let min_gas_price = 0;
let utxo_validation = true;
Self::new(ChainConfig::default(), min_gas_price, utxo_validation)
let metrics = false;
// 5 minute TTL
let transaction_ttl = Duration::from_secs(60 * 5);
Self::new(
ChainConfig::default(),
min_gas_price,
utxo_validation,
metrics,
transaction_ttl,
)
}
}

Expand All @@ -29,6 +41,8 @@ impl Config {
chain_config: ChainConfig,
min_gas_price: u64,
utxo_validation: bool,
metrics: bool,
transaction_ttl: Duration,
) -> Self {
// # Dev-note: If you add a new field, be sure that this field is propagated correctly
// in all places where `new` is used.
Expand All @@ -38,7 +52,8 @@ impl Config {
min_gas_price,
utxo_validation,
chain_config,
metrics: false,
metrics,
transaction_ttl,
}
}
}
2 changes: 2 additions & 0 deletions crates/services/txpool/src/containers.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod dependency;
pub mod price_sort;
pub mod sort;
pub mod time_sort;
6 changes: 2 additions & 4 deletions crates/services/txpool/src/containers/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
ports::TxPoolDb,
types::*,
Error,
TxInfo,
};
use anyhow::anyhow;
use fuel_core_types::{
Expand All @@ -15,10 +16,7 @@ use fuel_core_types::{
UtxoId,
},
fuel_types::MessageId,
services::txpool::{
ArcPoolTx,
TxInfo,
},
services::txpool::ArcPoolTx,
};
use std::collections::{
HashMap,
Expand Down
61 changes: 24 additions & 37 deletions crates/services/txpool/src/containers/price_sort.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,39 @@
use crate::types::*;
use fuel_core_types::services::txpool::ArcPoolTx;
use std::{
cmp,
collections::BTreeMap,
use crate::{
containers::sort::{
Sort,
SortableKey,
},
types::*,
TxInfo,
};
use std::cmp;

#[derive(Debug, Default, Clone)]
pub struct PriceSort {
/// all transactions sorted by min/max value
pub sort: BTreeMap<PriceSortKey, ArcPoolTx>,
}

impl PriceSort {
pub fn remove(&mut self, tx: &ArcPoolTx) {
self.sort.remove(&PriceSortKey::new(tx));
}

// get last transaction. It has lowest gas price.
pub fn last(&self) -> Option<ArcPoolTx> {
self.sort.iter().next().map(|(_, tx)| tx.clone())
}

pub fn lowest_price(&self) -> GasPrice {
self.sort
.iter()
.next()
.map(|(price, _)| price.price)
.unwrap_or_default()
}

pub fn insert(&mut self, tx: &ArcPoolTx) {
self.sort.insert(PriceSortKey::new(tx), tx.clone());
}
}
/// all transactions sorted by min/max price
pub type PriceSort = Sort<PriceSortKey>;

#[derive(Clone, Debug)]
pub struct PriceSortKey {
price: GasPrice,
tx_id: TxId,
}

impl PriceSortKey {
pub fn new(tx: &ArcPoolTx) -> Self {
impl SortableKey for PriceSortKey {
type Value = GasPrice;

fn new(info: &TxInfo) -> Self {
Self {
price: tx.price(),
tx_id: tx.id(),
price: info.tx().price(),
tx_id: info.tx().id(),
}
}

fn value(&self) -> &Self::Value {
&self.price
}

fn tx_id(&self) -> &TxId {
&self.tx_id
}
}

impl PartialEq for PriceSortKey {
Expand Down
56 changes: 56 additions & 0 deletions crates/services/txpool/src/containers/sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::{
types::*,
TxInfo,
};
use fuel_core_types::services::txpool::ArcPoolTx;
use std::collections::BTreeMap;

#[derive(Debug, Clone)]
pub struct Sort<Key> {
/// all transactions sorted by min/max value
pub sort: BTreeMap<Key, ArcPoolTx>,
}

impl<Key> Default for Sort<Key> {
fn default() -> Self {
Self {
sort: Default::default(),
}
}
}

impl<Key> Sort<Key>
where
Key: SortableKey,
{
pub fn remove(&mut self, info: &TxInfo) {
self.sort.remove(&Key::new(info));
}

pub fn lowest_tx(&self) -> Option<ArcPoolTx> {
self.sort.iter().next().map(|(_, tx)| tx.clone())
}

pub fn lowest_value(&self) -> Option<Key::Value> {
self.sort.iter().next().map(|(key, _)| key.value().clone())
}

pub fn lowest(&self) -> Option<(&Key, &ArcPoolTx)> {
self.sort.iter().next()
}

pub fn insert(&mut self, info: &TxInfo) {
let tx = info.tx().clone();
self.sort.insert(Key::new(info), tx);
}
}

pub trait SortableKey: Ord {
type Value: Clone;

fn new(info: &TxInfo) -> Self;

fn value(&self) -> &Self::Value;

fn tx_id(&self) -> &TxId;
}
Loading

0 comments on commit 05367d6

Please sign in to comment.