diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 158e75aa87d..1749b8bea09 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -130,7 +130,12 @@ pub trait SubgraphStore: Send + Sync + 'static { /// Return a `WritableStore` that is used for indexing subgraphs. Only /// code that is part of indexing a subgraph should ever use this. The /// `logger` will be used to log important messages related to the - /// subgraph + /// subgraph. + /// + /// This function should only be called in situations where no + /// assumptions about the in-memory state of writing has been made; in + /// particular, no assumptions about whether previous writes have + /// actually been committed or not. async fn writable( self: Arc, logger: Logger, diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 19bc747f298..1c883579879 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1232,7 +1232,12 @@ impl SubgraphStoreTrait for SubgraphStore { // idempotent and there is ever only one `WritableStore` for any // deployment if let Some(writable) = self.writables.lock().unwrap().get(&deployment) { - return Ok(writable.cheap_clone()); + // A poisoned writable will not write anything anymore; we + // discard it and create a new one that is properly initialized + // according to the state in the database. + if !writable.poisoned() { + return Ok(writable.cheap_clone()); + } } // Ideally the lower level functions would be asyncified. diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 800a0ced080..417fe368428 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -791,6 +791,10 @@ impl Queue { Ok(dds) } + + fn poisoned(&self) -> bool { + self.poisoned.load(Ordering::SeqCst) + } } /// A shim to allow bypassing any pipelined store handling if need be @@ -908,6 +912,13 @@ impl Writer { Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await, } } + + fn poisoned(&self) -> bool { + match self { + Writer::Sync(_) => false, + Writer::Async(queue) => queue.poisoned(), + } + } } pub struct WritableStore { @@ -941,6 +952,10 @@ impl WritableStore { writer, }) } + + pub(crate) fn poisoned(&self) -> bool { + self.writer.poisoned() + } } impl ReadStore for WritableStore {