diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 58cae5d8e0..4b60bdfee1 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -6,7 +6,11 @@ use { http_client, price_estimation::{self, NativePriceEstimators}, }, - std::{net::SocketAddr, num::NonZeroUsize, time::Duration}, + std::{ + net::SocketAddr, + num::{NonZeroUsize, ParseFloatError}, + time::Duration, + }, url::Url, }; @@ -203,6 +207,16 @@ pub struct Arguments { value_parser = shared::arguments::duration_from_seconds, )] pub solve_deadline: Duration, + + /// Time interval in days between each cleanup operation of the + /// `order_events` database table. + #[clap(long, env, default_value = "1", value_parser = duration_from_days)] + pub order_events_cleanup_interval: Duration, + + /// Age threshold in days for order events to be eligible for cleanup in the + /// `order_events` database table. + #[clap(long, env, default_value = "30", value_parser = duration_from_days)] + pub order_events_cleanup_threshold: Duration, } impl std::fmt::Display for Arguments { @@ -270,6 +284,21 @@ impl std::fmt::Display for Arguments { writeln!(f, "score_cap: {}", self.score_cap)?; display_option(f, "shadow", &self.shadow)?; writeln!(f, "solve_deadline: {:?}", self.solve_deadline)?; + writeln!( + f, + "order_events_cleanup_interval: {:?}", + self.order_events_cleanup_interval + )?; + writeln!( + f, + "order_events_cleanup_threshold: {:?}", + self.order_events_cleanup_threshold + )?; Ok(()) } } + +fn duration_from_days(s: &str) -> Result { + let days = s.parse::()?; + Ok(Duration::from_secs_f64(days * 86_400.0)) +} diff --git a/crates/autopilot/src/database/order_events.rs b/crates/autopilot/src/database/order_events.rs index 0afbd30b0b..9c6149720d 100644 --- a/crates/autopilot/src/database/order_events.rs +++ b/crates/autopilot/src/database/order_events.rs @@ -7,6 +7,7 @@ use { order_events::{self, OrderEvent}, }, model::order::OrderUid, + sqlx::Error, }; impl super::Postgres { @@ -19,6 +20,11 @@ impl super::Postgres { tracing::warn!(?err, "failed to insert order events"); } } + + /// Deletes events before the provided timestamp. + pub async fn delete_events_before(&self, timestamp: DateTime) -> Result { + order_events::delete_order_events_before(&self.0, timestamp).await + } } async fn store_order_events( diff --git a/crates/autopilot/src/lib.rs b/crates/autopilot/src/lib.rs index b08311a43f..40f0c148a4 100644 --- a/crates/autopilot/src/lib.rs +++ b/crates/autopilot/src/lib.rs @@ -5,6 +5,7 @@ pub mod driver_api; pub mod driver_model; pub mod event_updater; pub mod on_settlement_event_updater; +pub mod periodic_db_cleanup; pub mod protocol; pub mod run; pub mod run_loop; diff --git a/crates/autopilot/src/periodic_db_cleanup.rs b/crates/autopilot/src/periodic_db_cleanup.rs new file mode 100644 index 0000000000..bd18ccd6d1 --- /dev/null +++ b/crates/autopilot/src/periodic_db_cleanup.rs @@ -0,0 +1,67 @@ +use { + crate::database::Postgres, + chrono::{DateTime, Utc}, + std::time::Duration, + tokio::time, +}; + +pub struct OrderEventsCleanerConfig { + cleanup_interval: Duration, + event_age_threshold: chrono::Duration, +} + +impl OrderEventsCleanerConfig { + pub fn new(cleanup_interval: Duration, event_age_threshold: Duration) -> Self { + OrderEventsCleanerConfig { + cleanup_interval, + event_age_threshold: chrono::Duration::from_std(event_age_threshold).unwrap(), + } + } +} + +pub struct OrderEventsCleaner { + config: OrderEventsCleanerConfig, + db: Postgres, +} + +impl OrderEventsCleaner { + pub fn new(config: OrderEventsCleanerConfig, db: Postgres) -> Self { + OrderEventsCleaner { config, db } + } + + pub async fn run_forever(self) -> ! { + let mut interval = time::interval(self.config.cleanup_interval); + loop { + let timestamp: DateTime = Utc::now() - self.config.event_age_threshold; + match self.db.delete_events_before(timestamp).await { + Ok(affected_rows_count) => { + tracing::debug!( + "deleted {:?} order events before {}", + affected_rows_count, + timestamp + ); + Metrics::get() + .last_order_events_cleanup_run + .set(Utc::now().timestamp()) + } + Err(err) => { + tracing::warn!(?err, "failed to delete order events before {}", timestamp) + } + } + interval.tick().await; + } + } +} + +#[derive(prometheus_metric_storage::MetricStorage)] +struct Metrics { + /// Timestamp of the last successful `order_events` table cleanup. + #[metric(name = "periodic_db_cleanup", labels("type"))] + last_order_events_cleanup_run: prometheus::IntGauge, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(observe::metrics::get_storage_registry()).unwrap() + } +} diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 6e90f8d3de..d29745b45d 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -591,6 +591,21 @@ pub async fn run(args: Arguments) { .instrument(tracing::info_span!("on_settlement_event_updater")), ); + let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new( + args.order_events_cleanup_interval, + args.order_events_cleanup_threshold, + ); + let order_events_cleaner = crate::periodic_db_cleanup::OrderEventsCleaner::new( + order_events_cleaner_config, + db.clone(), + ); + + tokio::task::spawn( + order_events_cleaner + .run_forever() + .instrument(tracing::info_span!("order_events_cleaner")), + ); + if args.enable_colocation { if args.drivers.is_empty() { panic!("colocation is enabled but no drivers are configured"); diff --git a/crates/database/src/order_events.rs b/crates/database/src/order_events.rs index c31b001559..080551fa5f 100644 --- a/crates/database/src/order_events.rs +++ b/crates/database/src/order_events.rs @@ -4,7 +4,7 @@ use { crate::OrderUid, chrono::Utc, - sqlx::{types::chrono::DateTime, PgConnection}, + sqlx::{types::chrono::DateTime, PgConnection, PgPool}, }; /// Describes what kind of event was registered for an order. @@ -66,3 +66,19 @@ VALUES ($1, $2, $3) .await .map(|_| ()) } + +/// Deletes rows before the provided timestamp from the `order_events` table. +pub async fn delete_order_events_before( + pool: &PgPool, + timestamp: DateTime, +) -> Result { + const QUERY: &str = r#" +DELETE FROM order_events +WHERE timestamp < $1 +"#; + sqlx::query(QUERY) + .bind(timestamp) + .execute(pool) + .await + .map(|result| result.rows_affected()) +} diff --git a/database/sql/V057__add_timestamp_index_to_order_events.sql b/database/sql/V057__add_timestamp_index_to_order_events.sql new file mode 100644 index 0000000000..35d5a17b88 --- /dev/null +++ b/database/sql/V057__add_timestamp_index_to_order_events.sql @@ -0,0 +1,2 @@ +-- Adds an index on the 'timestamp' column of 'order_events' table to facilitate efficient periodic cleanups. +CREATE INDEX order_events_by_timestamp ON order_events (timestamp);