From d52c5e63b6d160c06ebc280397b77065a9c3eebe Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 23 Sep 2022 16:01:05 -0700 Subject: [PATCH 1/4] store: Accept &str in catalog::table_exists as the namespace --- store/postgres/src/catalog.rs | 4 ++-- store/postgres/src/relational/prune.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 4a0a8d043c6..bb67ef00d88 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -161,7 +161,7 @@ fn get_text_columns( pub fn table_exists( conn: &PgConnection, - namespace: &Namespace, + namespace: &str, table: &SqlName, ) -> Result { #[derive(Debug, QueryableByName)] @@ -186,7 +186,7 @@ pub fn supports_proof_of_indexing( lazy_static! { static ref POI_TABLE_NAME: SqlName = SqlName::verbatim(POI_TABLE.to_owned()); } - table_exists(conn, namespace, &POI_TABLE_NAME) + table_exists(conn, namespace.as_str(), &POI_TABLE_NAME) } /// Whether the given table has an exclusion constraint. When we create diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index fcc04b76567..ed7d83a63cf 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -41,7 +41,7 @@ impl TablePair { let dst = src.new_like(&layout.site.namespace, &new_name); let mut query = String::new(); - if catalog::table_exists(conn, &layout.site.namespace, &dst.name)? { + if catalog::table_exists(conn, layout.site.namespace.as_str(), &dst.name)? { writeln!(query, "truncate table {nsp}.{new_name};")?; } else { dst.create_table(&mut query, layout)?; From f93b0d2d0688efb27c2955af91f3205961646d98 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Mon, 19 Sep 2022 09:50:50 -0700 Subject: [PATCH 2/4] store: Detect if there were any migrations to actually run --- store/postgres/src/catalog.rs | 25 ++++++++++++++++++++++++- store/postgres/src/connection_pool.rs | 21 +++++++++++---------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index bb67ef00d88..3bf19c8d0b5 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -21,7 +21,7 @@ use graph::{ use crate::connection_pool::ForeignServer; use crate::{ - primary::{Namespace, Site}, + primary::{Namespace, Site, NAMESPACE_PUBLIC}, relational::SqlName, }; @@ -52,6 +52,19 @@ table! { } } +table! { + __diesel_schema_migrations(version) { + version -> Text, + run_on -> Timestamp, + } +} + +lazy_static! { + /// The name of the table in which Diesel records migrations + static ref MIGRATIONS_TABLE: SqlName = + SqlName::verbatim("__diesel_schema_migrations".to_string()); +} + // In debug builds (for testing etc.) create exclusion constraints, in // release builds for production, skip them #[cfg(debug_assertions)] @@ -310,6 +323,16 @@ pub fn recreate_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError> Ok(conn.batch_execute(&query)?) } +pub fn migration_count(conn: &PgConnection) -> Result { + use __diesel_schema_migrations as m; + + if !table_exists(conn, NAMESPACE_PUBLIC, &*MIGRATIONS_TABLE)? { + return Ok(0); + } + + m::table.count().get_result(conn).map_err(StoreError::from) +} + pub fn account_like(conn: &PgConnection, site: &Site) -> Result, StoreError> { use table_stats as ts; let names = ts::table diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 3e665a1abd9..45ec255c1c6 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -983,7 +983,7 @@ impl PoolInner { let result = pool .configure_fdw(servers.as_ref()) .and_then(|()| migrate_schema(&pool.logger, &conn)) - .and_then(|()| pool.map_primary()) + .and_then(|had_migrations| pool.map_primary()) .and_then(|()| pool.map_metadata(servers.as_ref())); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&conn).unwrap_or_else(|err| { @@ -1068,19 +1068,22 @@ embed_migrations!("./migrations"); /// When multiple `graph-node` processes start up at the same time, we ensure /// that they do not run migrations in parallel by using `blocking_conn` to /// serialize them. The `conn` is used to run the actual migration. -fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<(), StoreError> { +fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result { // Collect migration logging output let mut output = vec![]; + let old_count = catalog::migration_count(conn)?; + info!(logger, "Running migrations"); let result = embedded_migrations::run_with_output(conn, &mut output); info!(logger, "Migrations finished"); + let had_migrations = catalog::migration_count(conn)? != old_count; + // If there was any migration output, log it now let msg = String::from_utf8(output).unwrap_or_else(|_| String::from("")); let msg = msg.trim(); - let has_output = !msg.is_empty(); - if has_output { + if !msg.is_empty() { let msg = msg.replace('\n', " "); if let Err(e) = result { error!(logger, "Postgres migration error"; "output" => msg); @@ -1090,13 +1093,11 @@ fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result<(), StoreError } } - if has_output { - // We take getting output as a signal that a migration was actually - // run, which is not easy to tell from the Diesel API, and reset the - // query statistics since a schema change makes them not all that - // useful. An error here is not serious and can be ignored. + if had_migrations { + // Reset the query statistics since a schema change makes them not + // all that useful. An error here is not serious and can be ignored. conn.batch_execute("select pg_stat_statements_reset()").ok(); } - Ok(()) + Ok(had_migrations) } From 9ffc739c9eba5b427d8cc177d77d23415c3ec8f7 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 23 Sep 2022 15:33:09 -0700 Subject: [PATCH 3/4] node, store: Properly propagate schema changes in a shard --- node/src/bin/manager.rs | 4 +- node/src/store_builder.rs | 24 +++-- store/postgres/src/connection_pool.rs | 146 ++++++++++++++++++++------ 3 files changed, 127 insertions(+), 47 deletions(-) diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index f68a2dbdfed..7715c8c5c1d 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -21,6 +21,7 @@ use graph_node::{ store_builder::StoreBuilder, MetricsContext, }; +use graph_store_postgres::connection_pool::PoolCoordinator; use graph_store_postgres::ChainStore; use graph_store_postgres::{ connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore, @@ -587,13 +588,14 @@ impl Context { fn primary_pool(self) -> ConnectionPool { let primary = self.config.primary_store(); + let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![]))); let pool = StoreBuilder::main_pool( &self.logger, &self.node_id, PRIMARY_SHARD.as_str(), primary, self.metrics_registry(), - Arc::new(vec![]), + coord, ); pool.skip_setup(); pool diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 205f182d06f..786c1ff6e73 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -9,7 +9,9 @@ use graph::{ prelude::{info, CheapClone, Logger}, util::security::SafeDisplay, }; -use graph_store_postgres::connection_pool::{ConnectionPool, ForeignServer, PoolName}; +use graph_store_postgres::connection_pool::{ + ConnectionPool, ForeignServer, PoolCoordinator, PoolName, +}; use graph_store_postgres::{ BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener, NotificationSender, Shard as ShardName, Store as DieselStore, SubgraphStore, @@ -102,6 +104,7 @@ impl StoreBuilder { .collect::, _>>() .expect("connection url's contain enough detail"); let servers = Arc::new(servers); + let coord = Arc::new(PoolCoordinator::new(servers)); let shards: Vec<_> = config .stores @@ -114,7 +117,7 @@ impl StoreBuilder { name, shard, registry.cheap_clone(), - servers.clone(), + coord.clone(), ); let (read_only_conn_pools, weights) = Self::replica_pools( @@ -123,7 +126,7 @@ impl StoreBuilder { name, shard, registry.cheap_clone(), - servers.clone(), + coord.clone(), ); let name = @@ -191,7 +194,7 @@ impl StoreBuilder { name: &str, shard: &Shard, registry: Arc, - servers: Arc>, + coord: Arc, ) -> ConnectionPool { let logger = logger.new(o!("pool" => "main")); let pool_size = shard.pool_size.size_for(node, name).expect(&format!( @@ -209,15 +212,14 @@ impl StoreBuilder { "conn_pool_size" => pool_size, "weight" => shard.weight ); - ConnectionPool::create( + coord.create_pool( + &logger, name, PoolName::Main, shard.connection.to_owned(), pool_size, Some(fdw_pool_size), - &logger, registry.cheap_clone(), - servers, ) } @@ -228,7 +230,7 @@ impl StoreBuilder { name: &str, shard: &Shard, registry: Arc, - servers: Arc>, + coord: Arc, ) -> (Vec, Vec) { let mut weights: Vec<_> = vec![shard.weight]; ( @@ -250,15 +252,15 @@ impl StoreBuilder { "we can determine the pool size for replica {}", name )); - ConnectionPool::create( + + coord.clone().create_pool( + &logger, name, PoolName::Replica(pool), replica.connection.clone(), pool_size, None, - &logger, registry.cheap_clone(), - servers.clone(), ) }) .collect(), diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 45ec255c1c6..5137e408c02 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -25,7 +25,7 @@ use graph::{ use std::fmt::{self, Write}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{collections::HashMap, sync::RwLock}; @@ -158,9 +158,6 @@ impl ForeignServer { /// Map key tables from the primary into our local schema. If we are the /// primary, set them up as views. - /// - /// We recreate this mapping on every server start so that migrations that - /// change one of the mapped tables actually show up in the imported tables fn map_primary(conn: &PgConnection, shard: &Shard) -> Result<(), StoreError> { catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?; @@ -226,7 +223,7 @@ const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60); enum PoolState { /// A connection pool, and all the servers for which we need to /// establish fdw mappings when we call `setup` on the pool - Created(Arc, Arc>), + Created(Arc, Arc), /// The pool has been successfully set up Ready(Arc), /// The pool has been disabled by setting its size to 0 @@ -300,7 +297,7 @@ impl PoolStateTracker { } impl ConnectionPool { - pub fn create( + fn create( shard_name: &str, pool_name: PoolName, postgres_url: String, @@ -308,7 +305,7 @@ impl ConnectionPool { fdw_pool_size: Option, logger: &Logger, registry: Arc, - servers: Arc>, + coord: Arc, ) -> ConnectionPool { let state_tracker = PoolStateTracker::new(); let shard = @@ -330,7 +327,7 @@ impl ConnectionPool { if pool_name.is_replica() { PoolState::Ready(Arc::new(pool)) } else { - PoolState::Created(Arc::new(pool), servers) + PoolState::Created(Arc::new(pool), coord) } } }; @@ -968,7 +965,7 @@ impl PoolInner { /// # Panics /// /// If any errors happen during the migration, the process panics - pub fn setup(&self, servers: Arc>) -> Result<(), StoreError> { + fn setup(&self, coord: Arc) -> Result<(), StoreError> { fn die(logger: &Logger, msg: &'static str, err: &dyn std::fmt::Display) -> ! { crit!(logger, "{}", msg; "error" => format!("{:#}", err)); panic!("{}: {}", msg, err); @@ -980,11 +977,29 @@ impl PoolInner { let start = Instant::now(); advisory_lock::lock_migration(&conn) .unwrap_or_else(|err| die(&pool.logger, "failed to get migration lock", &err)); + // This code can cause a race in database setup: if pool A has had + // schema changes and pool B then tries to map tables from pool A, + // but does so before the concurrent thread running this code for + // pool B has at least finished `configure_fdw`, mapping tables will + // fail. In that case, the node must be restarted. The restart is + // guaranteed because this failure will lead to a panic in the setup + // for pool A + // + // This code can also leave the table mappings in a state where they + // have not been updated if the process is killed after migrating + // the schema but before finishing remapping in all shards. + // Addressing that would require keeping track of the need to remap + // in the database instead of just in memory let result = pool - .configure_fdw(servers.as_ref()) + .configure_fdw(coord.servers.as_ref()) .and_then(|()| migrate_schema(&pool.logger, &conn)) - .and_then(|had_migrations| pool.map_primary()) - .and_then(|()| pool.map_metadata(servers.as_ref())); + .and_then(|had_migrations| { + if had_migrations { + coord.propagate_schema_change(&self.shard) + } else { + Ok(()) + } + }); debug!(&pool.logger, "Release migration lock"); advisory_lock::unlock_migration(&conn).unwrap_or_else(|err| { die(&pool.logger, "failed to release migration lock", &err); @@ -1021,17 +1036,6 @@ impl PoolInner { }) } - /// Map key tables from the primary into our local schema. If we are the - /// primary, set them up as views. - /// - /// We recreate this mapping on every server start so that migrations that - /// change one of the mapped tables actually show up in the imported tables - fn map_primary(&self) -> Result<(), StoreError> { - info!(&self.logger, "Mapping primary"); - let conn = self.get()?; - conn.transaction(|| ForeignServer::map_primary(&conn, &self.shard)) - } - /// Copy the data from key tables in the primary into our local schema /// so it can be used as a fallback when the primary goes down pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> { @@ -1046,18 +1050,21 @@ impl PoolInner { .await } - // Map some tables from the `subgraphs` metadata schema from foreign - // servers to ourselves. The mapping is recreated on every server start - // so that we pick up possible schema changes in the mappings - fn map_metadata(&self, servers: &[ForeignServer]) -> Result<(), StoreError> { - info!(&self.logger, "Mapping metadata"); - let conn = self.get()?; - conn.transaction(|| { - for server in servers.iter().filter(|server| server.shard != self.shard) { - server.map_metadata(&conn)?; - } - Ok(()) - }) + // The foreign server `server` had schema changes, and we therefore need + // to remap anything that we are importing via fdw to make sure we are + // using this updated schema + fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { + if &server.shard == &*PRIMARY_SHARD { + info!(&self.logger, "Mapping primary"); + let conn = self.get()?; + conn.transaction(|| ForeignServer::map_primary(&conn, &self.shard))?; + } + if &server.shard != &self.shard { + info!(&self.logger, "Mapping metadata"); + let conn = self.get()?; + conn.transaction(|| server.map_metadata(&conn))?; + } + Ok(()) } } @@ -1101,3 +1108,72 @@ fn migrate_schema(logger: &Logger, conn: &PgConnection) -> Result>>, + servers: Arc>, +} + +impl PoolCoordinator { + pub fn new(servers: Arc>) -> Self { + Self { + pools: Mutex::new(HashMap::new()), + servers, + } + } + + pub fn create_pool( + self: Arc, + logger: &Logger, + name: &str, + pool_name: PoolName, + postgres_url: String, + pool_size: u32, + fdw_pool_size: Option, + registry: Arc, + ) -> ConnectionPool { + let pool = ConnectionPool::create( + name, + pool_name, + postgres_url, + pool_size, + fdw_pool_size, + logger, + registry, + self.cheap_clone(), + ); + // It is safe to take this lock here since nobody has seen the pool + // yet. We remember the `PoolInner` so that later, when we have to + // call `remap()`, we do not have to take this lock as that will be + // already held in `get_ready()` + match &*pool.inner.lock(logger) { + PoolState::Created(inner, _) | PoolState::Ready(inner) => { + self.pools + .lock() + .unwrap() + .insert(pool.shard.clone(), inner.clone()); + } + PoolState::Disabled => { /* nothing to do */ } + } + pool + } + + /// Propagate changes to the schema in `shard` to all other pools. Those + /// other pools will then recreate any tables that they imported from + /// `shard` + fn propagate_schema_change(&self, shard: &Shard) -> Result<(), StoreError> { + let server = self + .servers + .iter() + .find(|server| &server.shard == shard) + .ok_or_else(|| constraint_violation!("unknown shard {shard}"))?; + + for pool in self.pools.lock().unwrap().values() { + pool.remap(server)?; + } + Ok(()) + } +} From ef75f314711efb5715a043efe5b11b7db0cac0c1 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 29 Sep 2022 14:34:52 -0700 Subject: [PATCH 4/4] node, store: Expose migrating and remapping schemas in graphman --- node/src/bin/manager.rs | 32 ++++++++++++++++++++++++++- node/src/manager/commands/database.rs | 22 ++++++++++++++++++ node/src/manager/commands/mod.rs | 1 + node/src/store_builder.rs | 12 +++++++--- store/postgres/src/connection_pool.rs | 17 ++++++++++++-- 5 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 node/src/manager/commands/database.rs diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 7715c8c5c1d..786164b0476 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -233,6 +233,10 @@ pub enum Command { #[clap(long, short, default_value = "10000")] history: usize, }, + + /// General database management + #[clap(subcommand)] + Database(DatabaseCommand), } impl Command { @@ -495,6 +499,18 @@ pub enum IndexCommand { }, } +#[derive(Clone, Debug, Subcommand)] +pub enum DatabaseCommand { + /// Apply any pending migrations to the database schema in all shards + Migrate, + /// Refresh the mapping of tables into different shards + /// + /// This command rebuilds the mappings of tables from one shard into all + /// other shards. It makes it possible to fix these mappings when a + /// database migration was interrupted before it could rebuild the + /// mappings + Remap, +} #[derive(Clone, Debug, Subcommand)] pub enum CheckBlockMethod { /// The number of the target block @@ -644,7 +660,7 @@ impl Context { } fn store_and_pools(self) -> (Arc, HashMap) { - let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools( + let (subgraph_store, pools, _) = StoreBuilder::make_subgraph_store_and_pools( &self.logger, &self.node_id, &self.config, @@ -1059,6 +1075,20 @@ async fn main() -> anyhow::Result<()> { } } } + Database(cmd) => { + match cmd { + DatabaseCommand::Migrate => { + /* creating the store builder runs migrations */ + let _store_builder = ctx.store_builder().await; + println!("All database migrations have been applied"); + Ok(()) + } + DatabaseCommand::Remap => { + let store_builder = ctx.store_builder().await; + commands::database::remap(&store_builder.coord).await + } + } + } Prune { deployment, history, diff --git a/node/src/manager/commands/database.rs b/node/src/manager/commands/database.rs new file mode 100644 index 00000000000..6d335c462b1 --- /dev/null +++ b/node/src/manager/commands/database.rs @@ -0,0 +1,22 @@ +use std::time::Instant; + +use graph::prelude::anyhow; +use graph_store_postgres::connection_pool::PoolCoordinator; + +pub async fn remap(coord: &PoolCoordinator) -> Result<(), anyhow::Error> { + let pools = coord.pools(); + let servers = coord.servers(); + + for server in servers.iter() { + for pool in pools.iter() { + let start = Instant::now(); + print!( + "Remapping imports from {} in shard {}", + server.shard, pool.shard + ); + pool.remap(server)?; + println!(" (done in {}s)", start.elapsed().as_secs()); + } + } + Ok(()) +} diff --git a/node/src/manager/commands/mod.rs b/node/src/manager/commands/mod.rs index eab1f1a0f7a..8c87bf3a68b 100644 --- a/node/src/manager/commands/mod.rs +++ b/node/src/manager/commands/mod.rs @@ -4,6 +4,7 @@ pub mod check_blocks; pub mod config; pub mod copy; pub mod create; +pub mod database; pub mod index; pub mod info; pub mod listen; diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 786c1ff6e73..7a7b139c21b 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -28,6 +28,7 @@ pub struct StoreBuilder { chain_head_update_listener: Arc, /// Map network names to the shards where they are/should be stored chains: HashMap, + pub coord: Arc, } impl StoreBuilder { @@ -49,7 +50,7 @@ impl StoreBuilder { registry.clone(), )); - let (store, pools) = Self::make_subgraph_store_and_pools( + let (store, pools, coord) = Self::make_subgraph_store_and_pools( logger, node, config, @@ -82,6 +83,7 @@ impl StoreBuilder { subscription_manager, chain_head_update_listener, chains, + coord, } } @@ -94,7 +96,11 @@ impl StoreBuilder { config: &Config, fork_base: Option, registry: Arc, - ) -> (Arc, HashMap) { + ) -> ( + Arc, + HashMap, + Arc, + ) { let notification_sender = Arc::new(NotificationSender::new(registry.cheap_clone())); let servers = config @@ -150,7 +156,7 @@ impl StoreBuilder { registry, )); - (store, pools) + (store, pools, coord) } pub fn make_store( diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 5137e408c02..130dd5e1ba3 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -673,7 +673,7 @@ impl HandleEvent for EventHandler { #[derive(Clone)] pub struct PoolInner { logger: Logger, - shard: Shard, + pub shard: Shard, pool: Pool>, // A separate pool for connections that will use foreign data wrappers. // Once such a connection accesses a foreign table, Postgres keeps a @@ -1053,7 +1053,7 @@ impl PoolInner { // The foreign server `server` had schema changes, and we therefore need // to remap anything that we are importing via fdw to make sure we are // using this updated schema - fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { + pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { if &server.shard == &*PRIMARY_SHARD { info!(&self.logger, "Mapping primary"); let conn = self.get()?; @@ -1176,4 +1176,17 @@ impl PoolCoordinator { } Ok(()) } + + pub fn pools(&self) -> Vec> { + self.pools + .lock() + .unwrap() + .values() + .map(|pool| pool.clone()) + .collect() + } + + pub fn servers(&self) -> Arc> { + self.servers.clone() + } }