From d1004e631f3d9d02ffd1712543d791f11926bae3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 20 Sep 2022 09:04:54 -0700 Subject: [PATCH] store: Be considerate of replicas when copying Copying of subgraphs can create a lot of data to be replicated, which can make replicas fall hopelessly behind. With this change, the copy code checks whether there are replicas that are too far behind and backs off of copying for a bit to give them a chance to catch up. --- store/postgres/src/catalog.rs | 25 +++++++++++++++++++++++++ store/postgres/src/copy.rs | 30 +++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 22bf3d941ef..99e1fbb604d 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Write; use std::iter::FromIterator; use std::sync::Arc; +use std::time::Duration; use graph::prelude::anyhow::anyhow; use graph::{ @@ -616,3 +617,27 @@ pub fn stats(conn: &PgConnection, namespace: &Namespace) -> Result Result { + #[derive(Queryable, QueryableByName)] + struct Lag { + #[sql_type = "Nullable"] + ms: Option, + } + + let lag = sql_query( + "select extract(milliseconds from max(greatest(write_lag, flush_lag, replay_lag)))::int as ms \ + from pg_stat_replication", + ) + .get_result::(conn)?; + + let lag = lag + .ms + .map(|ms| if ms <= 0 { 0 } else { ms as u64 }) + .unwrap_or(0); + + Ok(Duration::from_millis(lag)) +} diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 6c69664acfb..6454e8c04e0 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -38,7 +38,7 @@ use graph::{ }; use crate::{ - advisory_lock, + advisory_lock, catalog, dynds::DataSourcesTable, primary::{DeploymentId, Site}, }; @@ -54,6 +54,16 @@ const INITIAL_BATCH_SIZE_LIST: i64 = 100; const TARGET_DURATION: Duration = Duration::from_secs(5 * 60); const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60); +/// If replicas are lagging by more than this, the copying code will pause +/// for a while to allow replicas to catch up +const MAX_REPLICATION_LAG: Duration = Duration::from_secs(60); +/// If replicas need to catch up, do not resume copying until the lag is +/// less than this +const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30); +/// When replicas are lagging too much, sleep for this long before checking +/// the lag again +const REPLICATION_SLEEP: Duration = Duration::from_secs(10); + table! { subgraphs.copy_state(dst) { // deployment_schemas.id @@ -744,6 +754,24 @@ impl Connection { if table.is_cancelled(&self.conn)? { return Ok(Status::Cancelled); } + + // Pause copying if replication is lagging behind to avoid + // overloading replicas + let mut lag = catalog::replication_lag(&self.conn)?; + if lag > MAX_REPLICATION_LAG { + loop { + info!(&self.logger, + "Replicas are lagging too much; pausing copying for {}s to allow them to catch up", + REPLICATION_SLEEP.as_secs(); + "lag_s" => lag.as_secs()); + std::thread::sleep(REPLICATION_SLEEP); + lag = catalog::replication_lag(&self.conn)?; + if lag <= ACCEPTABLE_REPLICATION_LAG { + break; + } + } + } + let status = self.transaction(|conn| table.copy_batch(conn))?; if status == Status::Cancelled { return Ok(status);