diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 22bf3d941ef..99e1fbb604d 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Write; use std::iter::FromIterator; use std::sync::Arc; +use std::time::Duration; use graph::prelude::anyhow::anyhow; use graph::{ @@ -616,3 +617,27 @@ pub fn stats(conn: &PgConnection, namespace: &Namespace) -> Result Result { + #[derive(Queryable, QueryableByName)] + struct Lag { + #[sql_type = "Nullable"] + ms: Option, + } + + let lag = sql_query( + "select extract(milliseconds from max(greatest(write_lag, flush_lag, replay_lag)))::int as ms \ + from pg_stat_replication", + ) + .get_result::(conn)?; + + let lag = lag + .ms + .map(|ms| if ms <= 0 { 0 } else { ms as u64 }) + .unwrap_or(0); + + Ok(Duration::from_millis(lag)) +} diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 6c69664acfb..0d5e2716170 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -38,7 +38,7 @@ use graph::{ }; use crate::{ - advisory_lock, + advisory_lock, catalog, dynds::DataSourcesTable, primary::{DeploymentId, Site}, }; @@ -54,6 +54,16 @@ const INITIAL_BATCH_SIZE_LIST: i64 = 100; const TARGET_DURATION: Duration = Duration::from_secs(5 * 60); const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60); +/// If replicas are lagging by more than this, the copying code will pause +/// for a while to allow replicas to catch up +const MAX_REPLICATION_LAG: Duration = Duration::from_secs(60); +/// If replicas need to catch up, do not resume copying until the lag is +/// less than this +const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); +/// When replicas are lagging too much, sleep for this long before checking +/// the lag again +const REPLICATION_SLEEP: Duration = Duration::from_secs(30); + table! { subgraphs.copy_state(dst) { // deployment_schemas.id @@ -744,6 +754,24 @@ impl Connection { if table.is_cancelled(&self.conn)? { return Ok(Status::Cancelled); } + + // Pause copying if replication is lagging behind to avoid + // overloading replicas + let mut lag = catalog::replication_lag(&self.conn)?; + if lag > MAX_REPLICATION_LAG { + loop { + info!(&self.logger, + "Replicas are lagging too much; pausing copying for {}s to allow them to catch up", + REPLICATION_SLEEP.as_secs(); + "lag_s" => lag.as_secs()); + std::thread::sleep(REPLICATION_SLEEP); + lag = catalog::replication_lag(&self.conn)?; + if lag <= ACCEPTABLE_REPLICATION_LAG { + break; + } + } + } + let status = self.transaction(|conn| table.copy_batch(conn))?; if status == Status::Cancelled { return Ok(status);