diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 0f05f5359d1..a1a464bf1aa 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -119,15 +119,23 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< }); } - fn stop_subgraph(&self, loc: DeploymentLocator) { + async fn stop_subgraph(&self, loc: DeploymentLocator) { let logger = self.logger_factory.subgraph_logger(&loc); - info!(logger, "Stop subgraph"); + + match self.subgraph_store.stop_subgraph(&loc).await { + Ok(()) => debug!(logger, "Stopped subgraph writer"), + Err(err) => { + error!(logger, "Error stopping subgraph writer"; "error" => format!("{:#}", err)) + } + } // Drop the cancel guard to shut down the subgraph now let mut instances = self.instances.write().unwrap(); instances.remove(&loc.id); self.manager_metrics.subgraph_count.dec(); + + info!(logger, "Stopped subgraph"); } } diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index c513cb3144e..11b325af3f9 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -81,7 +81,7 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss .remove(&deployment.id) { // Shut down subgraph processing - self.instance_manager.stop_subgraph(deployment); + self.instance_manager.stop_subgraph(deployment).await; Ok(()) } else { Err(SubgraphAssignmentProviderError::NotRunning(deployment)) diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 087bde3c32a..a3f75a3aa51 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -142,6 +142,10 @@ pub trait SubgraphStore: Send + Sync + 'static { deployment: DeploymentId, ) -> Result, StoreError>; + /// Initiate a graceful shutdown of the writable that a previous call to + /// `writable` might have started + async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; + /// Return the minimum block pointer of all deployments with this `id` /// that we would use to query or copy from; in particular, this will /// ignore any instances of this deployment that are in the process of diff --git a/graph/src/components/subgraph/instance_manager.rs b/graph/src/components/subgraph/instance_manager.rs index 3b1777e3df8..c04fd5237b4 100644 --- a/graph/src/components/subgraph/instance_manager.rs +++ b/graph/src/components/subgraph/instance_manager.rs @@ -16,5 +16,5 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static { manifest: serde_yaml::Mapping, stop_block: Option, ); - fn stop_subgraph(&self, deployment: DeploymentLocator); + async fn stop_subgraph(&self, deployment: DeploymentLocator); } diff --git a/graph/src/util/timed_cache.rs b/graph/src/util/timed_cache.rs index 1d2e6c7f373..20ac7ba49fd 100644 --- a/graph/src/util/timed_cache.rs +++ b/graph/src/util/timed_cache.rs @@ -90,6 +90,21 @@ impl TimedCache { .find(move |entry| pred(entry.value.as_ref())) .map(|entry| entry.value.clone()) } + + /// Remove an entry from the cache. If there was an entry for `key`, + /// return the value associated with it and whether the entry is still + /// live + pub fn remove(&self, key: &Q) -> Option<(Arc, bool)> + where + K: Borrow + Eq + Hash, + Q: Hash + Eq, + { + self.entries + .write() + .unwrap() + .remove(key) + .map(|CacheEntry { value, expires }| (value, expires >= Instant::now())) + } } #[test] diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 6078286dc1b..c73f4999ae4 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -470,11 +470,15 @@ pub fn initialize_block_ptr(conn: &PgConnection, site: &Site) -> Result<(), Stor let needs_init = d::table .filter(d::id.eq(site.id)) - .filter(d::latest_ethereum_block_hash.is_null()) - .select(d::id) - .first::(conn) - .optional()? - .is_some(); + .select(d::latest_ethereum_block_hash) + .first::>>(conn) + .map_err(|e| { + constraint_violation!( + "deployment sgd{} must have been created before calling initialize_block_ptr but we got {}", + site.id, e + ) + })? + .is_none(); if needs_init { if let (Some(hash), Some(number)) = m::table diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 95383d4d229..6846835bfe4 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -1444,6 +1444,14 @@ impl LayoutCache { } } + pub(crate) fn remove(&self, site: &Site) -> Option> { + self.entries + .lock() + .unwrap() + .remove(&site.deployment) + .map(|CacheEntry { value, expires: _ }| value.clone()) + } + // Only needed for tests #[cfg(debug_assertions)] pub(crate) fn clear(&self) { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index f771dbd7e1c..dfa208fde78 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -373,6 +373,20 @@ impl SubgraphStoreInner { Ok(site) } + fn evict(&self, id: &DeploymentHash) -> Result<(), StoreError> { + if let Some((site, _)) = self.sites.remove(id) { + let store = self.stores.get(&site.shard).ok_or_else(|| { + constraint_violation!( + "shard {} for deployment sgd{} not found when evicting", + site.shard, + site.id + ) + })?; + store.layout_cache.remove(&site); + } + Ok(()) + } + fn find_site(&self, id: DeploymentId) -> Result, StoreError> { if let Some(site) = self.sites.find(|site| site.id == id) { return Ok(site); @@ -499,6 +513,8 @@ impl SubgraphStoreInner { #[cfg(not(debug_assertions))] assert!(!replace); + self.evict(&schema.id)?; + let graft_base = deployment .graft_base .as_ref() @@ -1264,6 +1280,18 @@ impl SubgraphStoreTrait for SubgraphStore { Ok(writable) } + async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> { + self.evict(&loc.hash)?; + + // Remove the writable from the cache and stop it + let deployment = loc.id.into(); + let writable = self.writables.lock().unwrap().remove(&deployment); + match writable { + Some(writable) => writable.stop().await, + None => Ok(()), + } + } + fn is_deployed(&self, id: &DeploymentHash) -> Result { match self.site(id) { Ok(_) => Ok(true), diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 85aa117053a..aa547af2382 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -400,6 +400,7 @@ impl BlockTracker { self.revert = self.revert.min(block_ptr.number); self.block = self.block.min(block_ptr.number); } + Request::Stop => { /* do nothing */ } } } @@ -438,10 +439,16 @@ enum Request { block_ptr: BlockPtr, firehose_cursor: FirehoseCursor, }, + Stop, +} + +enum ExecResult { + Continue, + Stop, } impl Request { - fn execute(&self) -> Result<(), StoreError> { + fn execute(&self) -> Result { match self { Request::Write { store, @@ -453,21 +460,26 @@ impl Request { deterministic_errors, manifest_idx_and_name, offchain_to_remove, - } => store.transact_block_operations( - block_ptr_to, - firehose_cursor, - mods, - stopwatch, - data_sources, - deterministic_errors, - manifest_idx_and_name, - offchain_to_remove, - ), + } => store + .transact_block_operations( + block_ptr_to, + firehose_cursor, + mods, + stopwatch, + data_sources, + deterministic_errors, + manifest_idx_and_name, + offchain_to_remove, + ) + .map(|()| ExecResult::Continue), Request::RevertTo { store, block_ptr, firehose_cursor, - } => store.revert_block_operations(block_ptr.clone(), firehose_cursor), + } => store + .revert_block_operations(block_ptr.clone(), firehose_cursor) + .map(|()| ExecResult::Continue), + Request::Stop => return Ok(ExecResult::Stop), } } } @@ -556,12 +568,19 @@ impl Queue { }; let _section = queue.stopwatch.start_section("queue_pop"); + use ExecResult::*; match res { - Ok(Ok(())) => { + Ok(Ok(Continue)) => { // The request has been handled. It's now safe to remove it // from the queue queue.queue.pop().await; } + Ok(Ok(Stop)) => { + // Graceful shutdown. We also handled the request + // successfully + queue.queue.pop().await; + return; + } Ok(Err(e)) => { error!(logger, "Subgraph writer failed"; "error" => e.to_string()); queue.record_err(e); @@ -616,6 +635,10 @@ impl Queue { self.check_err() } + async fn stop(&self) -> Result<(), StoreError> { + self.push(Request::Stop).await + } + fn check_err(&self) -> Result<(), StoreError> { if let Some(err) = self.write_err.lock().unwrap().take() { return Err(err); @@ -670,7 +693,7 @@ impl Queue { None } } - Request::RevertTo { .. } => None, + Request::RevertTo { .. } | Request::Stop => None, } }); @@ -725,7 +748,7 @@ impl Queue { } } } - Request::RevertTo { .. } => { /* nothing to do */ } + Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ } } map }, @@ -779,7 +802,7 @@ impl Queue { .collect(); } } - Request::RevertTo { .. } => { /* nothing to do */ } + Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ } } dds }); @@ -925,6 +948,13 @@ impl Writer { Writer::Async(queue) => queue.poisoned(), } } + + async fn stop(&self) -> Result<(), StoreError> { + match self { + Writer::Sync(_) => Ok(()), + Writer::Async(queue) => queue.stop().await, + } + } } pub struct WritableStore { @@ -962,6 +992,10 @@ impl WritableStore { pub(crate) fn poisoned(&self) -> bool { self.writer.poisoned() } + + pub(crate) async fn stop(&self) -> Result<(), StoreError> { + self.writer.stop().await + } } impl ReadStore for WritableStore {