Skip to content

Commit

Permalink
store: Make it possible to restart a failed writer
Browse files Browse the repository at this point in the history
So far, if the writer for a subgraph encounterd an error, it would stick
around as 'poisoned' for as long as the process lived. That made it
impossible to restart a subgraph (possibly after clearing some external
error condition) With this change, `SubgraphStore.writable` will create a
new writer if the old one has been poisoned.
  • Loading branch information
lutter committed Sep 20, 2022
1 parent 7356e0b commit 47937cf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
7 changes: 6 additions & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ pub trait SubgraphStore: Send + Sync + 'static {
/// Return a `WritableStore` that is used for indexing subgraphs. Only
/// code that is part of indexing a subgraph should ever use this. The
/// `logger` will be used to log important messages related to the
/// subgraph
/// subgraph.
///
/// This function should only be called in situations where no
/// assumptions about the in-memory state of writing has been made; in
/// particular, no assumptions about whether previous writes have
/// actually been committed or not.
async fn writable(
self: Arc<Self>,
logger: Logger,
Expand Down
7 changes: 6 additions & 1 deletion store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,12 @@ impl SubgraphStoreTrait for SubgraphStore {
// idempotent and there is ever only one `WritableStore` for any
// deployment
if let Some(writable) = self.writables.lock().unwrap().get(&deployment) {
return Ok(writable.cheap_clone());
// A poisoned writable will not write anything anymore; we
// discard it and create a new one that is properly initialized
// according to the state in the database.
if !writable.poisoned() {
return Ok(writable.cheap_clone());
}
}

// Ideally the lower level functions would be asyncified.
Expand Down
15 changes: 15 additions & 0 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,10 @@ impl Queue {

Ok(dds)
}

fn poisoned(&self) -> bool {
self.poisoned.load(Ordering::SeqCst)
}
}

/// A shim to allow bypassing any pipelined store handling if need be
Expand Down Expand Up @@ -908,6 +912,13 @@ impl Writer {
Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await,
}
}

fn poisoned(&self) -> bool {
match self {
Writer::Sync(_) => false,
Writer::Async(queue) => queue.poisoned(),
}
}
}

pub struct WritableStore {
Expand Down Expand Up @@ -941,6 +952,10 @@ impl WritableStore {
writer,
})
}

pub(crate) fn poisoned(&self) -> bool {
self.writer.poisoned()
}
}

impl ReadStore for WritableStore {
Expand Down

0 comments on commit 47937cf

Please sign in to comment.