Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve how we apply database migrations #4009

Merged
merged 4 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use graph_node::{
store_builder::StoreBuilder,
MetricsContext,
};
use graph_store_postgres::connection_pool::PoolCoordinator;
use graph_store_postgres::ChainStore;
use graph_store_postgres::{
connection_pool::ConnectionPool, BlockStore, NotificationSender, Shard, Store, SubgraphStore,
Expand Down Expand Up @@ -232,6 +233,10 @@ pub enum Command {
#[clap(long, short, default_value = "10000")]
history: usize,
},

/// General database management
#[clap(subcommand)]
Database(DatabaseCommand),
}

impl Command {
Expand Down Expand Up @@ -494,6 +499,18 @@ pub enum IndexCommand {
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum DatabaseCommand {
/// Apply any pending migrations to the database schema in all shards
Migrate,
/// Refresh the mapping of tables into different shards
///
/// This command rebuilds the mappings of tables from one shard into all
/// other shards. It makes it possible to fix these mappings when a
/// database migration was interrupted before it could rebuild the
/// mappings
Remap,
}
#[derive(Clone, Debug, Subcommand)]
pub enum CheckBlockMethod {
/// The number of the target block
Expand Down Expand Up @@ -587,13 +604,14 @@ impl Context {

fn primary_pool(self) -> ConnectionPool {
let primary = self.config.primary_store();
let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![])));
let pool = StoreBuilder::main_pool(
&self.logger,
&self.node_id,
PRIMARY_SHARD.as_str(),
primary,
self.metrics_registry(),
Arc::new(vec![]),
coord,
);
pool.skip_setup();
pool
Expand Down Expand Up @@ -642,7 +660,7 @@ impl Context {
}

fn store_and_pools(self) -> (Arc<Store>, HashMap<Shard, ConnectionPool>) {
let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools(
let (subgraph_store, pools, _) = StoreBuilder::make_subgraph_store_and_pools(
&self.logger,
&self.node_id,
&self.config,
Expand Down Expand Up @@ -1057,6 +1075,20 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Database(cmd) => {
match cmd {
DatabaseCommand::Migrate => {
/* creating the store builder runs migrations */
let _store_builder = ctx.store_builder().await;
println!("All database migrations have been applied");
Ok(())
}
DatabaseCommand::Remap => {
let store_builder = ctx.store_builder().await;
commands::database::remap(&store_builder.coord).await
}
}
}
Prune {
deployment,
history,
Expand Down
22 changes: 22 additions & 0 deletions node/src/manager/commands/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::time::Instant;

use graph::prelude::anyhow;
use graph_store_postgres::connection_pool::PoolCoordinator;

pub async fn remap(coord: &PoolCoordinator) -> Result<(), anyhow::Error> {
let pools = coord.pools();
let servers = coord.servers();

for server in servers.iter() {
for pool in pools.iter() {
let start = Instant::now();
print!(
"Remapping imports from {} in shard {}",
server.shard, pool.shard
);
pool.remap(server)?;
println!(" (done in {}s)", start.elapsed().as_secs());
}
}
Ok(())
}
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod check_blocks;
pub mod config;
pub mod copy;
pub mod create;
pub mod database;
pub mod index;
pub mod info;
pub mod listen;
Expand Down
36 changes: 22 additions & 14 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use graph::{
prelude::{info, CheapClone, Logger},
util::security::SafeDisplay,
};
use graph_store_postgres::connection_pool::{ConnectionPool, ForeignServer, PoolName};
use graph_store_postgres::connection_pool::{
ConnectionPool, ForeignServer, PoolCoordinator, PoolName,
};
use graph_store_postgres::{
BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener,
NotificationSender, Shard as ShardName, Store as DieselStore, SubgraphStore,
Expand All @@ -26,6 +28,7 @@ pub struct StoreBuilder {
chain_head_update_listener: Arc<PostgresChainHeadUpdateListener>,
/// Map network names to the shards where they are/should be stored
chains: HashMap<String, ShardName>,
pub coord: Arc<PoolCoordinator>,
}

impl StoreBuilder {
Expand All @@ -47,7 +50,7 @@ impl StoreBuilder {
registry.clone(),
));

let (store, pools) = Self::make_subgraph_store_and_pools(
let (store, pools, coord) = Self::make_subgraph_store_and_pools(
logger,
node,
config,
Expand Down Expand Up @@ -80,6 +83,7 @@ impl StoreBuilder {
subscription_manager,
chain_head_update_listener,
chains,
coord,
}
}

Expand All @@ -92,7 +96,11 @@ impl StoreBuilder {
config: &Config,
fork_base: Option<Url>,
registry: Arc<dyn MetricsRegistry>,
) -> (Arc<SubgraphStore>, HashMap<ShardName, ConnectionPool>) {
) -> (
Arc<SubgraphStore>,
HashMap<ShardName, ConnectionPool>,
Arc<PoolCoordinator>,
) {
let notification_sender = Arc::new(NotificationSender::new(registry.cheap_clone()));

let servers = config
Expand All @@ -102,6 +110,7 @@ impl StoreBuilder {
.collect::<Result<Vec<_>, _>>()
.expect("connection url's contain enough detail");
let servers = Arc::new(servers);
let coord = Arc::new(PoolCoordinator::new(servers));

let shards: Vec<_> = config
.stores
Expand All @@ -114,7 +123,7 @@ impl StoreBuilder {
name,
shard,
registry.cheap_clone(),
servers.clone(),
coord.clone(),
);

let (read_only_conn_pools, weights) = Self::replica_pools(
Expand All @@ -123,7 +132,7 @@ impl StoreBuilder {
name,
shard,
registry.cheap_clone(),
servers.clone(),
coord.clone(),
);

let name =
Expand All @@ -147,7 +156,7 @@ impl StoreBuilder {
registry,
));

(store, pools)
(store, pools, coord)
}

pub fn make_store(
Expand Down Expand Up @@ -191,7 +200,7 @@ impl StoreBuilder {
name: &str,
shard: &Shard,
registry: Arc<dyn MetricsRegistry>,
servers: Arc<Vec<ForeignServer>>,
coord: Arc<PoolCoordinator>,
) -> ConnectionPool {
let logger = logger.new(o!("pool" => "main"));
let pool_size = shard.pool_size.size_for(node, name).expect(&format!(
Expand All @@ -209,15 +218,14 @@ impl StoreBuilder {
"conn_pool_size" => pool_size,
"weight" => shard.weight
);
ConnectionPool::create(
coord.create_pool(
&logger,
name,
PoolName::Main,
shard.connection.to_owned(),
pool_size,
Some(fdw_pool_size),
&logger,
registry.cheap_clone(),
servers,
)
}

Expand All @@ -228,7 +236,7 @@ impl StoreBuilder {
name: &str,
shard: &Shard,
registry: Arc<dyn MetricsRegistry>,
servers: Arc<Vec<ForeignServer>>,
coord: Arc<PoolCoordinator>,
) -> (Vec<ConnectionPool>, Vec<usize>) {
let mut weights: Vec<_> = vec![shard.weight];
(
Expand All @@ -250,15 +258,15 @@ impl StoreBuilder {
"we can determine the pool size for replica {}",
name
));
ConnectionPool::create(

coord.clone().create_pool(
&logger,
name,
PoolName::Replica(pool),
replica.connection.clone(),
pool_size,
None,
&logger,
registry.cheap_clone(),
servers.clone(),
)
})
.collect(),
Expand Down
29 changes: 26 additions & 3 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use graph::{

use crate::connection_pool::ForeignServer;
use crate::{
primary::{Namespace, Site},
primary::{Namespace, Site, NAMESPACE_PUBLIC},
relational::SqlName,
};

Expand Down Expand Up @@ -52,6 +52,19 @@ table! {
}
}

table! {
__diesel_schema_migrations(version) {
version -> Text,
run_on -> Timestamp,
}
}

lazy_static! {
/// The name of the table in which Diesel records migrations
static ref MIGRATIONS_TABLE: SqlName =
SqlName::verbatim("__diesel_schema_migrations".to_string());
}

// In debug builds (for testing etc.) create exclusion constraints, in
// release builds for production, skip them
#[cfg(debug_assertions)]
Expand Down Expand Up @@ -161,7 +174,7 @@ fn get_text_columns(

pub fn table_exists(
conn: &PgConnection,
namespace: &Namespace,
namespace: &str,
table: &SqlName,
) -> Result<bool, StoreError> {
#[derive(Debug, QueryableByName)]
Expand All @@ -186,7 +199,7 @@ pub fn supports_proof_of_indexing(
lazy_static! {
static ref POI_TABLE_NAME: SqlName = SqlName::verbatim(POI_TABLE.to_owned());
}
table_exists(conn, namespace, &POI_TABLE_NAME)
table_exists(conn, namespace.as_str(), &POI_TABLE_NAME)
}

/// Whether the given table has an exclusion constraint. When we create
Expand Down Expand Up @@ -310,6 +323,16 @@ pub fn recreate_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError>
Ok(conn.batch_execute(&query)?)
}

pub fn migration_count(conn: &PgConnection) -> Result<i64, StoreError> {
use __diesel_schema_migrations as m;

if !table_exists(conn, NAMESPACE_PUBLIC, &*MIGRATIONS_TABLE)? {
return Ok(0);
}

m::table.count().get_result(conn).map_err(StoreError::from)
}

pub fn account_like(conn: &PgConnection, site: &Site) -> Result<HashSet<String>, StoreError> {
use table_stats as ts;
let names = ts::table
Expand Down
Loading