Skip to content

Commit

Permalink
Periodic order events table clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Nov 27, 2023
1 parent fa8796c commit c98939a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 2 deletions.
31 changes: 30 additions & 1 deletion crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use {
http_client,
price_estimation::{self, NativePriceEstimators},
},
std::{net::SocketAddr, num::NonZeroUsize, time::Duration},
std::{
net::SocketAddr,
num::{NonZeroUsize, ParseFloatError},
time::Duration,
},
url::Url,
};

Expand Down Expand Up @@ -203,6 +207,16 @@ pub struct Arguments {
value_parser = shared::arguments::duration_from_seconds,
)]
pub solve_deadline: Duration,

/// Time interval in days between each cleanup operation of the
/// `order_events` database table.
#[clap(long, env, default_value = "1", value_parser = duration_from_days)]
pub order_events_cleanup_interval: Duration,

/// Age threshold in days for order events to be eligible for cleanup in the
/// `order_events` database table.
#[clap(long, env, default_value = "30", value_parser = duration_from_days)]
pub order_events_cleanup_threshold: Duration,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -270,6 +284,21 @@ impl std::fmt::Display for Arguments {
writeln!(f, "score_cap: {}", self.score_cap)?;
display_option(f, "shadow", &self.shadow)?;
writeln!(f, "solve_deadline: {:?}", self.solve_deadline)?;
writeln!(
f,
"order_events_cleanup_interval: {:?}",
self.order_events_cleanup_interval
)?;
writeln!(
f,
"order_events_cleanup_threshold: {:?}",
self.order_events_cleanup_threshold
)?;
Ok(())
}
}

fn duration_from_days(s: &str) -> Result<Duration, ParseFloatError> {
let days = s.parse::<f64>()?;
Ok(Duration::from_secs_f64(days * 86_400.0))
}
6 changes: 6 additions & 0 deletions crates/autopilot/src/database/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
order_events::{self, OrderEvent},
},
model::order::OrderUid,
sqlx::Error,
};

impl super::Postgres {
Expand All @@ -19,6 +20,11 @@ impl super::Postgres {
tracing::warn!(?err, "failed to insert order events");
}
}

/// Deletes events before the provided timestamp.
pub async fn delete_events_before(&self, timestamp: DateTime<Utc>) -> Result<u64, Error> {
order_events::delete_order_events_before(&self.0, timestamp).await
}
}

async fn store_order_events(
Expand Down
1 change: 1 addition & 0 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod driver_api;
pub mod driver_model;
pub mod event_updater;
pub mod on_settlement_event_updater;
pub mod periodic_db_cleanup;
pub mod protocol;
pub mod run;
pub mod run_loop;
Expand Down
67 changes: 67 additions & 0 deletions crates/autopilot/src/periodic_db_cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use {
crate::database::Postgres,
chrono::{DateTime, Utc},
std::time::Duration,
tokio::time,
};

pub struct OrderEventsCleanerConfig {
cleanup_interval: Duration,
event_age_threshold: chrono::Duration,
}

impl OrderEventsCleanerConfig {
pub fn new(cleanup_interval: Duration, event_age_threshold: Duration) -> Self {
OrderEventsCleanerConfig {
cleanup_interval,
event_age_threshold: chrono::Duration::from_std(event_age_threshold).unwrap(),
}
}
}

pub struct OrderEventsCleaner {
config: OrderEventsCleanerConfig,
db: Postgres,
}

impl OrderEventsCleaner {
pub fn new(config: OrderEventsCleanerConfig, db: Postgres) -> Self {
OrderEventsCleaner { config, db }
}

pub async fn run_forever(self) -> ! {
let mut interval = time::interval(self.config.cleanup_interval);
loop {
let timestamp: DateTime<Utc> = Utc::now() - self.config.event_age_threshold;
match self.db.delete_events_before(timestamp).await {
Ok(affected_rows_count) => {
tracing::debug!(
"deleted {:?} order events before {}",
affected_rows_count,
timestamp
);
Metrics::get()
.last_order_events_cleanup_run
.set(Utc::now().timestamp())
}
Err(err) => {
tracing::warn!(?err, "failed to delete order events before {}", timestamp)
}
}
interval.tick().await;
}
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// Timestamp of the last successful `order_events` table cleanup.
#[metric(name = "periodic_db_cleanup", labels("type"))]
last_order_events_cleanup_run: prometheus::IntGauge,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(observe::metrics::get_storage_registry()).unwrap()
}
}
15 changes: 15 additions & 0 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ pub async fn run(args: Arguments) {
.instrument(tracing::info_span!("on_settlement_event_updater")),
);

let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
args.order_events_cleanup_interval,
args.order_events_cleanup_threshold,
);
let order_events_cleaner = crate::periodic_db_cleanup::OrderEventsCleaner::new(
order_events_cleaner_config,
db.clone(),
);

tokio::task::spawn(
order_events_cleaner
.run_forever()
.instrument(tracing::info_span!("order_events_cleaner")),
);

if args.enable_colocation {
if args.drivers.is_empty() {
panic!("colocation is enabled but no drivers are configured");
Expand Down
18 changes: 17 additions & 1 deletion crates/database/src/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
crate::OrderUid,
chrono::Utc,
sqlx::{types::chrono::DateTime, PgConnection},
sqlx::{types::chrono::DateTime, PgConnection, PgPool},
};

/// Describes what kind of event was registered for an order.
Expand Down Expand Up @@ -66,3 +66,19 @@ VALUES ($1, $2, $3)
.await
.map(|_| ())
}

/// Deletes rows before the provided timestamp from the `order_events` table.
pub async fn delete_order_events_before(
pool: &PgPool,
timestamp: DateTime<Utc>,
) -> Result<u64, sqlx::Error> {
const QUERY: &str = r#"
DELETE FROM order_events
WHERE timestamp < $1
"#;
sqlx::query(QUERY)
.bind(timestamp)
.execute(pool)
.await
.map(|result| result.rows_affected())
}
2 changes: 2 additions & 0 deletions database/sql/V057__add_timestamp_index_to_order_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Adds an index on the 'timestamp' column of 'order_events' table to facilitate efficient periodic cleanups.
CREATE INDEX order_events_by_timestamp ON order_events (timestamp);

0 comments on commit c98939a

Please sign in to comment.