Skip to content

Commit

Permalink
node, store: Expose migrating and remapping schemas in graphman
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Oct 4, 2022
1 parent 9ffc739 commit ef75f31
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 6 deletions.
32 changes: 31 additions & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ pub enum Command {
#[clap(long, short, default_value = "10000")]
history: usize,
},

/// General database management
#[clap(subcommand)]
Database(DatabaseCommand),
}

impl Command {
Expand Down Expand Up @@ -495,6 +499,18 @@ pub enum IndexCommand {
},
}

#[derive(Clone, Debug, Subcommand)]
pub enum DatabaseCommand {
/// Apply any pending migrations to the database schema in all shards
Migrate,
/// Refresh the mapping of tables into different shards
///
/// This command rebuilds the mappings of tables from one shard into all
/// other shards. It makes it possible to fix these mappings when a
/// database migration was interrupted before it could rebuild the
/// mappings
Remap,
}
#[derive(Clone, Debug, Subcommand)]
pub enum CheckBlockMethod {
/// The number of the target block
Expand Down Expand Up @@ -644,7 +660,7 @@ impl Context {
}

fn store_and_pools(self) -> (Arc<Store>, HashMap<Shard, ConnectionPool>) {
let (subgraph_store, pools) = StoreBuilder::make_subgraph_store_and_pools(
let (subgraph_store, pools, _) = StoreBuilder::make_subgraph_store_and_pools(
&self.logger,
&self.node_id,
&self.config,
Expand Down Expand Up @@ -1059,6 +1075,20 @@ async fn main() -> anyhow::Result<()> {
}
}
}
Database(cmd) => {
match cmd {
DatabaseCommand::Migrate => {
/* creating the store builder runs migrations */
let _store_builder = ctx.store_builder().await;
println!("All database migrations have been applied");
Ok(())
}
DatabaseCommand::Remap => {
let store_builder = ctx.store_builder().await;
commands::database::remap(&store_builder.coord).await
}
}
}
Prune {
deployment,
history,
Expand Down
22 changes: 22 additions & 0 deletions node/src/manager/commands/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::time::Instant;

use graph::prelude::anyhow;
use graph_store_postgres::connection_pool::PoolCoordinator;

pub async fn remap(coord: &PoolCoordinator) -> Result<(), anyhow::Error> {
let pools = coord.pools();
let servers = coord.servers();

for server in servers.iter() {
for pool in pools.iter() {
let start = Instant::now();
print!(
"Remapping imports from {} in shard {}",
server.shard, pool.shard
);
pool.remap(server)?;
println!(" (done in {}s)", start.elapsed().as_secs());
}
}
Ok(())
}
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod check_blocks;
pub mod config;
pub mod copy;
pub mod create;
pub mod database;
pub mod index;
pub mod info;
pub mod listen;
Expand Down
12 changes: 9 additions & 3 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct StoreBuilder {
chain_head_update_listener: Arc<PostgresChainHeadUpdateListener>,
/// Map network names to the shards where they are/should be stored
chains: HashMap<String, ShardName>,
pub coord: Arc<PoolCoordinator>,
}

impl StoreBuilder {
Expand All @@ -49,7 +50,7 @@ impl StoreBuilder {
registry.clone(),
));

let (store, pools) = Self::make_subgraph_store_and_pools(
let (store, pools, coord) = Self::make_subgraph_store_and_pools(
logger,
node,
config,
Expand Down Expand Up @@ -82,6 +83,7 @@ impl StoreBuilder {
subscription_manager,
chain_head_update_listener,
chains,
coord,
}
}

Expand All @@ -94,7 +96,11 @@ impl StoreBuilder {
config: &Config,
fork_base: Option<Url>,
registry: Arc<dyn MetricsRegistry>,
) -> (Arc<SubgraphStore>, HashMap<ShardName, ConnectionPool>) {
) -> (
Arc<SubgraphStore>,
HashMap<ShardName, ConnectionPool>,
Arc<PoolCoordinator>,
) {
let notification_sender = Arc::new(NotificationSender::new(registry.cheap_clone()));

let servers = config
Expand Down Expand Up @@ -150,7 +156,7 @@ impl StoreBuilder {
registry,
));

(store, pools)
(store, pools, coord)
}

pub fn make_store(
Expand Down
17 changes: 15 additions & 2 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ impl HandleEvent for EventHandler {
#[derive(Clone)]
pub struct PoolInner {
logger: Logger,
shard: Shard,
pub shard: Shard,
pool: Pool<ConnectionManager<PgConnection>>,
// A separate pool for connections that will use foreign data wrappers.
// Once such a connection accesses a foreign table, Postgres keeps a
Expand Down Expand Up @@ -1053,7 +1053,7 @@ impl PoolInner {
// The foreign server `server` had schema changes, and we therefore need
// to remap anything that we are importing via fdw to make sure we are
// using this updated schema
fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
if &server.shard == &*PRIMARY_SHARD {
info!(&self.logger, "Mapping primary");
let conn = self.get()?;
Expand Down Expand Up @@ -1176,4 +1176,17 @@ impl PoolCoordinator {
}
Ok(())
}

pub fn pools(&self) -> Vec<Arc<PoolInner>> {
self.pools
.lock()
.unwrap()
.values()
.map(|pool| pool.clone())
.collect()
}

pub fn servers(&self) -> Arc<Vec<ForeignServer>> {
self.servers.clone()
}
}

0 comments on commit ef75f31

Please sign in to comment.