Skip to content

Commit

Permalink
store: Be considerate of replicas when copying
Browse files Browse the repository at this point in the history
Copying of subgraphs can create a lot of data to be replicated, which can
make replicas fall hopelessly behind. With this change, the copy code
checks whether there are replicas that are too far behind and backs off of
copying for a bit to give them a chance to catch up.
  • Loading branch information
lutter committed Sep 20, 2022
1 parent 27c5348 commit 17f471f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
25 changes: 25 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::Duration;

use graph::prelude::anyhow::anyhow;
use graph::{
Expand Down Expand Up @@ -616,3 +617,27 @@ pub fn stats(conn: &PgConnection, namespace: &Namespace) -> Result<Vec<VersionSt

Ok(stats.into_iter().map(|s| s.into()).collect())
}

/// Return by how much the slowest replica connected to the database `conn`
/// is lagging. The returned value has millisecond precision. If the
/// database has no replicas, return `0`
pub(crate) fn replication_lag(conn: &PgConnection) -> Result<Duration, StoreError> {
#[derive(Queryable, QueryableByName)]
struct Lag {
#[sql_type = "Nullable<Integer>"]
ms: Option<i32>,
}

let lag = sql_query(
"select extract(milliseconds from max(greatest(write_lag, flush_lag, replay_lag)))::int as ms \
from pg_stat_replication",
)
.get_result::<Lag>(conn)?;

let lag = lag
.ms
.map(|ms| if ms <= 0 { 0 } else { ms as u64 })
.unwrap_or(0);

Ok(Duration::from_millis(lag))
}
30 changes: 29 additions & 1 deletion store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use graph::{
};

use crate::{
advisory_lock,
advisory_lock, catalog,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
};
Expand All @@ -54,6 +54,16 @@ const INITIAL_BATCH_SIZE_LIST: i64 = 100;
const TARGET_DURATION: Duration = Duration::from_secs(5 * 60);
const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);

/// If replicas are lagging by more than this, the copying code will pause
/// for a while to allow replicas to catch up
const MAX_REPLICATION_LAG: Duration = Duration::from_secs(60);
/// If replicas need to catch up, do not resume copying until the lag is
/// less than this
const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
/// When replicas are lagging too much, sleep for this long before checking
/// the lag again
const REPLICATION_SLEEP: Duration = Duration::from_secs(30);

table! {
subgraphs.copy_state(dst) {
// deployment_schemas.id
Expand Down Expand Up @@ -744,6 +754,24 @@ impl Connection {
if table.is_cancelled(&self.conn)? {
return Ok(Status::Cancelled);
}

// Pause copying if replication is lagging behind to avoid
// overloading replicas
let mut lag = catalog::replication_lag(&self.conn)?;
if lag > MAX_REPLICATION_LAG {
loop {
info!(&self.logger,
"Replicas are lagging too much; pausing copying for {}s to allow them to catch up",
REPLICATION_SLEEP.as_secs();
"lag_s" => lag.as_secs());
std::thread::sleep(REPLICATION_SLEEP);
lag = catalog::replication_lag(&self.conn)?;
if lag <= ACCEPTABLE_REPLICATION_LAG {
break;
}
}
}

let status = self.transaction(|conn| table.copy_batch(conn))?;
if status == Status::Cancelled {
return Ok(status);
Expand Down

0 comments on commit 17f471f

Please sign in to comment.