-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add subgraph health endpoint (#449)
- Loading branch information
1 parent
775f2f6
commit bab22af
Showing
6 changed files
with
329 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use axum::{ | ||
extract::Path, | ||
response::{IntoResponse, Response as AxumResponse}, | ||
Extension, Json, | ||
}; | ||
use graphql_client::GraphQLQuery; | ||
use indexer_config::GraphNodeConfig; | ||
use reqwest::StatusCode; | ||
use serde_json::json; | ||
use thiserror::Error; | ||
|
||
#[derive(GraphQLQuery)] | ||
#[graphql( | ||
schema_path = "../graphql/indexing_status.schema.graphql", | ||
query_path = "../graphql/subgraph_health.query.graphql", | ||
response_derives = "Debug", | ||
variables_derives = "Clone" | ||
)] | ||
pub struct HealthQuery; | ||
|
||
#[derive(Debug, Error)] | ||
pub enum CheckHealthError { | ||
#[error("Failed to send request")] | ||
RequestFailed, | ||
#[error("Failed to decode response")] | ||
BadResponse, | ||
#[error("Deployment not found")] | ||
DeploymentNotFound, | ||
#[error("Invalid health status found")] | ||
InvalidHealthStatus, | ||
} | ||
|
||
impl IntoResponse for CheckHealthError { | ||
fn into_response(self) -> AxumResponse { | ||
let status = match &self { | ||
CheckHealthError::DeploymentNotFound => StatusCode::NOT_FOUND, | ||
CheckHealthError::InvalidHealthStatus | CheckHealthError::BadResponse => { | ||
StatusCode::INTERNAL_SERVER_ERROR | ||
} | ||
CheckHealthError::RequestFailed => StatusCode::BAD_GATEWAY, | ||
}; | ||
let body = serde_json::json!({ | ||
"error": self.to_string(), | ||
}); | ||
(status, Json(body)).into_response() | ||
} | ||
} | ||
|
||
pub async fn health( | ||
Path(deployment_id): Path<String>, | ||
Extension(graph_node): Extension<GraphNodeConfig>, | ||
) -> Result<impl IntoResponse, CheckHealthError> { | ||
let req_body = HealthQuery::build_query(health_query::Variables { | ||
ids: vec![deployment_id], | ||
}); | ||
|
||
let client = reqwest::Client::new(); | ||
let response = client | ||
.post(graph_node.status_url) | ||
.json(&req_body) | ||
.send() | ||
.await | ||
.map_err(|_| CheckHealthError::RequestFailed)?; | ||
|
||
let graphql_response: graphql_client::Response<health_query::ResponseData> = response | ||
.json() | ||
.await | ||
.map_err(|_| CheckHealthError::BadResponse)?; | ||
|
||
let data = match (graphql_response.data, graphql_response.errors) { | ||
(Some(data), None) => data, | ||
_ => return Err(CheckHealthError::BadResponse), | ||
}; | ||
|
||
let Some(status) = data.indexing_statuses.first() else { | ||
return Err(CheckHealthError::DeploymentNotFound); | ||
}; | ||
let health_response = match status.health { | ||
health_query::Health::healthy => json!({ "health": status.health }), | ||
health_query::Health::unhealthy => { | ||
let errors: Vec<&String> = status | ||
.non_fatal_errors | ||
.iter() | ||
.map(|msg| &msg.message) | ||
.collect(); | ||
json!({ "health": status.health, "nonFatalErrors": errors }) | ||
} | ||
health_query::Health::failed => { | ||
json!({ "health": status.health, "fatalError": status.fatal_error.as_ref().map_or("null", |msg| &msg.message) }) | ||
} | ||
health_query::Health::Other(_) => return Err(CheckHealthError::InvalidHealthStatus), | ||
}; | ||
Ok(Json(health_response)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
schema { | ||
query: Query | ||
} | ||
|
||
type ApiVersion { | ||
""" | ||
Version number in SemVer format | ||
""" | ||
version: String! | ||
} | ||
|
||
scalar BigInt | ||
|
||
type Block { | ||
hash: Bytes! | ||
number: BigInt! | ||
} | ||
|
||
input BlockInput { | ||
hash: Bytes! | ||
number: BigInt! | ||
} | ||
|
||
scalar Bytes | ||
|
||
type CachedEthereumCall { | ||
idHash: Bytes! | ||
block: Block! | ||
contractAddress: Bytes! | ||
returnValue: Bytes! | ||
} | ||
|
||
interface ChainIndexingStatus { | ||
network: String! | ||
chainHeadBlock: Block | ||
earliestBlock: EarliestBlock | ||
latestBlock: Block | ||
lastHealthyBlock: Block | ||
} | ||
|
||
scalar Date | ||
|
||
type EarliestBlock { | ||
hash: Bytes! | ||
number: BigInt! | ||
} | ||
|
||
type EntityChanges { | ||
updates: [EntityTypeUpdates!]! | ||
deletions: [EntityTypeDeletions!]! | ||
} | ||
|
||
type EntityTypeDeletions { | ||
type: String! | ||
entities: [ID!]! | ||
} | ||
|
||
type EntityTypeUpdates { | ||
type: String! | ||
entities: [JSONObject!]! | ||
} | ||
|
||
type EthereumIndexingStatus implements ChainIndexingStatus { | ||
network: String! | ||
chainHeadBlock: Block | ||
earliestBlock: EarliestBlock | ||
latestBlock: Block | ||
lastHealthyBlock: Block | ||
} | ||
|
||
enum Feature { | ||
nonFatalErrors | ||
grafting | ||
fullTextSearch | ||
ipfsOnEthereumContracts | ||
aggregations | ||
declaredEthCalls | ||
immutableEntities | ||
bytesAsIds | ||
} | ||
|
||
enum Health { | ||
"""Subgraph syncing normally""" | ||
healthy | ||
"""Subgraph syncing but with errors""" | ||
unhealthy | ||
"""Subgraph halted due to errors""" | ||
failed | ||
} | ||
|
||
scalar JSONObject | ||
|
||
type PartialBlock { | ||
hash: Bytes | ||
number: BigInt! | ||
} | ||
|
||
input ProofOfIndexingRequest { | ||
deployment: String! | ||
block: BlockInput! | ||
} | ||
|
||
type ProofOfIndexingResult { | ||
deployment: String! | ||
block: Block! | ||
""" | ||
There may not be a proof of indexing available for the deployment and block | ||
""" | ||
proofOfIndexing: Bytes | ||
} | ||
|
||
input PublicProofOfIndexingRequest { | ||
deployment: String! | ||
blockNumber: BigInt! | ||
} | ||
|
||
type PublicProofOfIndexingResult { | ||
deployment: String! | ||
block: PartialBlock! | ||
proofOfIndexing: Bytes! | ||
} | ||
|
||
type Query { | ||
indexingStatusForCurrentVersion(subgraphName: String!): SubgraphIndexingStatus | ||
indexingStatusForPendingVersion(subgraphName: String!): SubgraphIndexingStatus | ||
indexingStatusesForSubgraphName(subgraphName: String!): [SubgraphIndexingStatus!]! | ||
indexingStatuses(subgraphs: [String!]): [SubgraphIndexingStatus!]! | ||
proofOfIndexing(subgraph: String!, blockNumber: Int!, blockHash: Bytes!, indexer: Bytes): Bytes | ||
""" | ||
Proofs of indexing for several deployments and blocks that can be shared and | ||
compared in public without revealing the _actual_ proof of indexing that every | ||
indexer has in their database | ||
""" | ||
publicProofsOfIndexing(requests: [PublicProofOfIndexingRequest!]!): [PublicProofOfIndexingResult!]! | ||
subgraphFeatures(subgraphId: String!): SubgraphFeatures! | ||
entityChangesInBlock(subgraphId: String!, blockNumber: Int!): EntityChanges! | ||
blockData(network: String!, blockHash: Bytes!): JSONObject | ||
blockHashFromNumber(network: String!, blockNumber: Int!): Bytes | ||
version: Version! | ||
cachedEthereumCalls(network: String!, blockHash: Bytes!): [CachedEthereumCall!] | ||
apiVersions(subgraphId: String!): [ApiVersion!]! | ||
} | ||
|
||
type SubgraphError { | ||
message: String! | ||
block: Block | ||
handler: String | ||
deterministic: Boolean! | ||
} | ||
|
||
type SubgraphFeatures { | ||
apiVersion: String | ||
specVersion: String! | ||
features: [Feature!]! | ||
dataSources: [String!]! | ||
handlers: [String!]! | ||
network: String | ||
} | ||
|
||
type SubgraphIndexingStatus { | ||
subgraph: String! | ||
synced: Boolean! | ||
health: Health! | ||
"""If the subgraph has failed, this is the error caused it""" | ||
fatalError: SubgraphError | ||
"""Sorted from first to last, limited to first 1000""" | ||
nonFatalErrors: [SubgraphError!]! | ||
chains: [ChainIndexingStatus!]! | ||
entityCount: BigInt! | ||
"""null if deployment is not assigned to an indexing node""" | ||
node: String | ||
"""null if deployment is not assigned to an indexing node""" | ||
paused: Boolean | ||
historyBlocks: Int! | ||
} | ||
|
||
type Version { | ||
version: String! | ||
commit: String! | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
query HealthQuery($ids: [String!]!) { | ||
indexingStatuses(subgraphs: $ids) { | ||
health | ||
fatalError { | ||
message | ||
} | ||
nonFatalErrors { | ||
message | ||
} | ||
} | ||
} |