Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle rapid deletion and recreation of subgraphs more gracefully #4044

Merged
merged 4 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,23 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
});
}

fn stop_subgraph(&self, loc: DeploymentLocator) {
async fn stop_subgraph(&self, loc: DeploymentLocator) {
let logger = self.logger_factory.subgraph_logger(&loc);
info!(logger, "Stop subgraph");

match self.subgraph_store.stop_subgraph(&loc).await {
Ok(()) => debug!(logger, "Stopped subgraph writer"),
Err(err) => {
error!(logger, "Error stopping subgraph writer"; "error" => format!("{:#}", err))
}
}

// Drop the cancel guard to shut down the subgraph now
let mut instances = self.instances.write().unwrap();
instances.remove(&loc.id);

self.manager_metrics.subgraph_count.dec();

info!(logger, "Stopped subgraph");
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
.remove(&deployment.id)
{
// Shut down subgraph processing
self.instance_manager.stop_subgraph(deployment);
self.instance_manager.stop_subgraph(deployment).await;
Ok(())
} else {
Err(SubgraphAssignmentProviderError::NotRunning(deployment))
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ pub trait SubgraphStore: Send + Sync + 'static {
deployment: DeploymentId,
) -> Result<Arc<dyn WritableStore>, StoreError>;

/// Initiate a graceful shutdown of the writable that a previous call to
/// `writable` might have started
async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>;

/// Return the minimum block pointer of all deployments with this `id`
/// that we would use to query or copy from; in particular, this will
/// ignore any instances of this deployment that are in the process of
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub trait SubgraphInstanceManager: Send + Sync + 'static {
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
);
fn stop_subgraph(&self, deployment: DeploymentLocator);
async fn stop_subgraph(&self, deployment: DeploymentLocator);
}
15 changes: 15 additions & 0 deletions graph/src/util/timed_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ impl<K, V> TimedCache<K, V> {
.find(move |entry| pred(entry.value.as_ref()))
.map(|entry| entry.value.clone())
}

/// Remove an entry from the cache. If there was an entry for `key`,
/// return the value associated with it and whether the entry is still
/// live
pub fn remove<Q: ?Sized>(&self, key: &Q) -> Option<(Arc<V>, bool)>
where
K: Borrow<Q> + Eq + Hash,
Q: Hash + Eq,
{
self.entries
.write()
.unwrap()
.remove(key)
.map(|CacheEntry { value, expires }| (value, expires >= Instant::now()))
}
}

#[test]
Expand Down
14 changes: 9 additions & 5 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,15 @@ pub fn initialize_block_ptr(conn: &PgConnection, site: &Site) -> Result<(), Stor

let needs_init = d::table
.filter(d::id.eq(site.id))
.filter(d::latest_ethereum_block_hash.is_null())
.select(d::id)
.first::<i32>(conn)
.optional()?
.is_some();
.select(d::latest_ethereum_block_hash)
.first::<Option<Vec<u8>>>(conn)
.map_err(|e| {
constraint_violation!(
"deployment sgd{} must have been created before calling initialize_block_ptr but we got {}",
site.id, e
)
})?
.is_none();

if needs_init {
if let (Some(hash), Some(number)) = m::table
Expand Down
8 changes: 8 additions & 0 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,14 @@ impl LayoutCache {
}
}

pub(crate) fn remove(&self, site: &Site) -> Option<Arc<Layout>> {
self.entries
.lock()
.unwrap()
.remove(&site.deployment)
.map(|CacheEntry { value, expires: _ }| value.clone())
}

// Only needed for tests
#[cfg(debug_assertions)]
pub(crate) fn clear(&self) {
Expand Down
28 changes: 28 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,20 @@ impl SubgraphStoreInner {
Ok(site)
}

fn evict(&self, id: &DeploymentHash) -> Result<(), StoreError> {
if let Some((site, _)) = self.sites.remove(id) {
let store = self.stores.get(&site.shard).ok_or_else(|| {
constraint_violation!(
"shard {} for deployment sgd{} not found when evicting",
site.shard,
site.id
)
})?;
store.layout_cache.remove(&site);
}
Ok(())
}

fn find_site(&self, id: DeploymentId) -> Result<Arc<Site>, StoreError> {
if let Some(site) = self.sites.find(|site| site.id == id) {
return Ok(site);
Expand Down Expand Up @@ -499,6 +513,8 @@ impl SubgraphStoreInner {
#[cfg(not(debug_assertions))]
assert!(!replace);

self.evict(&schema.id)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason we call evict on deployment creation so we have more confidence that the cache will be cleared?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to make sure that we don't have a stale entry for the same hash in the cache (which could happen if a previous deployment of the same hash was stopped and then redeployed) This is really redundant with the eviction in stop_subgraph but since it's a cheap operation seemed safer to do it in both places.


let graft_base = deployment
.graft_base
.as_ref()
Expand Down Expand Up @@ -1264,6 +1280,18 @@ impl SubgraphStoreTrait for SubgraphStore {
Ok(writable)
}

async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> {
self.evict(&loc.hash)?;

// Remove the writable from the cache and stop it
let deployment = loc.id.into();
let writable = self.writables.lock().unwrap().remove(&deployment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right docs right, this would only panic due to a programming error, so it is relatively safe to unwrap here.
Is that correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would panic if the lock is poisoned, i.e., if some other thread paniced while holding the lock. In that case, it's anybody's guess what's happening and panicing here is really the only thing we can do.

match writable {
Some(writable) => writable.stop().await,
None => Ok(()),
}
}

fn is_deployed(&self, id: &DeploymentHash) -> Result<bool, StoreError> {
match self.site(id) {
Ok(_) => Ok(true),
Expand Down
66 changes: 50 additions & 16 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ impl BlockTracker {
self.revert = self.revert.min(block_ptr.number);
self.block = self.block.min(block_ptr.number);
}
Request::Stop => { /* do nothing */ }
}
}

Expand Down Expand Up @@ -438,10 +439,16 @@ enum Request {
block_ptr: BlockPtr,
firehose_cursor: FirehoseCursor,
},
Stop,
}

enum ExecResult {
Continue,
Stop,
Comment on lines +445 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, so feel free to ignore this comment.

This enum somewhat similar to the ControlFlow from std.

While we don't need to return any data out of this, maybe we could redefine it as

type ExecResult = ControlFlow<()>

I know this won't change anything, but it would make room if we want to return data from those contexts in the future.

}

impl Request {
fn execute(&self) -> Result<(), StoreError> {
fn execute(&self) -> Result<ExecResult, StoreError> {
match self {
Request::Write {
store,
Expand All @@ -453,21 +460,26 @@ impl Request {
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
} => store.transact_block_operations(
block_ptr_to,
firehose_cursor,
mods,
stopwatch,
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
),
} => store
.transact_block_operations(
block_ptr_to,
firehose_cursor,
mods,
stopwatch,
data_sources,
deterministic_errors,
manifest_idx_and_name,
offchain_to_remove,
)
.map(|()| ExecResult::Continue),
Request::RevertTo {
store,
block_ptr,
firehose_cursor,
} => store.revert_block_operations(block_ptr.clone(), firehose_cursor),
} => store
.revert_block_operations(block_ptr.clone(), firehose_cursor)
.map(|()| ExecResult::Continue),
Request::Stop => return Ok(ExecResult::Stop),
}
}
}
Expand Down Expand Up @@ -556,12 +568,19 @@ impl Queue {
};

let _section = queue.stopwatch.start_section("queue_pop");
use ExecResult::*;
match res {
Ok(Ok(())) => {
Ok(Ok(Continue)) => {
// The request has been handled. It's now safe to remove it
// from the queue
queue.queue.pop().await;
}
Ok(Ok(Stop)) => {
// Graceful shutdown. We also handled the request
// successfully
queue.queue.pop().await;
return;
}
Ok(Err(e)) => {
error!(logger, "Subgraph writer failed"; "error" => e.to_string());
queue.record_err(e);
Expand Down Expand Up @@ -616,6 +635,10 @@ impl Queue {
self.check_err()
}

async fn stop(&self) -> Result<(), StoreError> {
self.push(Request::Stop).await
}

fn check_err(&self) -> Result<(), StoreError> {
if let Some(err) = self.write_err.lock().unwrap().take() {
return Err(err);
Expand Down Expand Up @@ -670,7 +693,7 @@ impl Queue {
None
}
}
Request::RevertTo { .. } => None,
Request::RevertTo { .. } | Request::Stop => None,
}
});

Expand Down Expand Up @@ -725,7 +748,7 @@ impl Queue {
}
}
}
Request::RevertTo { .. } => { /* nothing to do */ }
Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ }
}
map
},
Expand Down Expand Up @@ -779,7 +802,7 @@ impl Queue {
.collect();
}
}
Request::RevertTo { .. } => { /* nothing to do */ }
Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ }
}
dds
});
Expand Down Expand Up @@ -925,6 +948,13 @@ impl Writer {
Writer::Async(queue) => queue.poisoned(),
}
}

async fn stop(&self) -> Result<(), StoreError> {
match self {
Writer::Sync(_) => Ok(()),
Writer::Async(queue) => queue.stop().await,
}
}
}

pub struct WritableStore {
Expand Down Expand Up @@ -962,6 +992,10 @@ impl WritableStore {
pub(crate) fn poisoned(&self) -> bool {
self.writer.poisoned()
}

pub(crate) async fn stop(&self) -> Result<(), StoreError> {
self.writer.stop().await
}
}

impl ReadStore for WritableStore {
Expand Down