Skip to content

Commit

Permalink
Implement el_offline and use it in the VC (sigp#4295)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes sigp#4291, part of sigp#3613.

## Proposed Changes

- Implement the `el_offline` field on `/eth/v1/node/syncing`. We set `el_offline=true` if:
  - The EL's internal status is `Offline` or `AuthFailed`, _or_
  - The most recent call to `newPayload` resulted in an error (more on this in a moment).

- Use the `el_offline` field in the VC to mark nodes with offline ELs as _unsynced_. These nodes will still be used, but only after synced nodes.
- Overhaul the usage of `RequireSynced` so that `::No` is used almost everywhere. The `--allow-unsynced` flag was broken and had the opposite effect to intended, so it has been deprecated.
- Add tests for the EL being offline on the upcheck call, and being offline due to the newPayload check.


## Why track `newPayload` errors?

Tracking the EL's online/offline status is too coarse-grained to be useful in practice, because:

- If the EL is timing out to some calls, it's unlikely to timeout on the `upcheck` call, which is _just_ `eth_syncing`. Every failed call is followed by an upcheck [here](https://github.com/sigp/lighthouse/blob/693886b94176faa4cb450f024696cb69cda2fe58/beacon_node/execution_layer/src/engines.rs#L372-L380), which would have the effect of masking the failure and keeping the status _online_.
- The `newPayload` call is the most likely to time out. It's the call in which ELs tend to do most of their work (often 1-2 seconds), with `forkchoiceUpdated` usually returning much faster (<50ms).
- If `newPayload` is failing consistently (e.g. timing out) then this is a good indication that either the node's EL is in trouble, or the network as a whole is. In the first case validator clients _should_ prefer other BNs if they have one available. In the second case, all of their BNs will likely report `el_offline` and they'll just have to proceed with trying to use them.

## Additional Changes

- Add utility method `ForkName::latest` which is quite convenient for test writing, but probably other things too.
- Delete some stale comments from when we used to support multiple execution nodes.
  • Loading branch information
michaelsproul authored and isaac.asimov committed Jul 13, 2023
1 parent 113b153 commit 335faff
Show file tree
Hide file tree
Showing 21 changed files with 305 additions and 114 deletions.
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ async fn invalid_after_optimistic_sync() {
.await,
);

// EL status should still be online, no errors.
assert!(!rig.execution_layer().is_offline_or_erroring().await);

// Running fork choice is necessary since a block has been invalidated.
rig.recompute_head().await;

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ impl Engine {
**self.state.read().await == EngineStateInternal::Synced
}

/// Returns `true` if the engine has a status other than synced or syncing.
pub async fn is_offline(&self) -> bool {
EngineState::from(**self.state.read().await) == EngineState::Offline
}

/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck(&self) {
Expand Down
45 changes: 21 additions & 24 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ struct Inner<E: EthSpec> {
builder_profit_threshold: Uint256,
log: Logger,
always_prefer_builder_payload: bool,
/// Track whether the last `newPayload` call errored.
///
/// This is used *only* in the informational sync status endpoint, so that a VC using this
/// node can prefer another node with a healthier EL.
last_new_payload_errored: RwLock<bool>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -350,6 +355,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
builder_profit_threshold: Uint256::from(builder_profit_threshold),
log,
always_prefer_builder_payload,
last_new_payload_errored: RwLock::new(false),
};

Ok(Self {
Expand Down Expand Up @@ -542,6 +548,15 @@ impl<T: EthSpec> ExecutionLayer<T> {
synced
}

/// Return `true` if the execution layer is offline or returning errors on `newPayload`.
///
/// This function should never be used to prevent any operation in the beacon node, but can
/// be used to give an indication on the HTTP API that the node's execution layer is struggling,
/// which can in turn be used by the VC.
pub async fn is_offline_or_erroring(&self) -> bool {
self.engine().is_offline().await || *self.inner.last_new_payload_errored.read().await
}

/// Updates the proposer preparation data provided by validators
pub async fn update_proposer_preparation(
&self,
Expand Down Expand Up @@ -1116,18 +1131,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
}

/// Maps to the `engine_newPayload` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_new_payload(
&self,
execution_payload: &ExecutionPayload<T>,
Expand Down Expand Up @@ -1156,12 +1159,18 @@ impl<T: EthSpec> ExecutionLayer<T> {
&["new_payload", status.status.into()],
);
}
*self.inner.last_new_payload_errored.write().await = result.is_err();

process_payload_status(execution_payload.block_hash(), result, self.log())
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// Update engine sync status.
pub async fn upcheck(&self) {
self.engine().upcheck().await;
}

/// Register that the given `validator_index` is going to produce a block at `slot`.
///
/// The block will be built atop `head_block_root` and the EL will need to prepare an
Expand Down Expand Up @@ -1221,18 +1230,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
}

/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_forkchoice_updated(
&self,
head_block_hash: ExecutionBlockHash,
Expand Down
25 changes: 18 additions & 7 deletions beacon_node/execution_layer/src/test_utils/handle_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ pub async fn handle_rpc<T: EthSpec>(
.map_err(|s| (s, GENERIC_ERROR_CODE))?;

match method {
ETH_SYNCING => Ok(JsonValue::Bool(false)),
ETH_SYNCING => ctx
.syncing_response
.lock()
.clone()
.map(JsonValue::Bool)
.map_err(|message| (message, GENERIC_ERROR_CODE)),
ETH_GET_BLOCK_BY_NUMBER => {
let tag = params
.get(0)
Expand Down Expand Up @@ -145,7 +150,9 @@ pub async fn handle_rpc<T: EthSpec>(

// Canned responses set by block hash take priority.
if let Some(status) = ctx.get_new_payload_status(request.block_hash()) {
return Ok(serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap());
return status
.map(|status| serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap())
.map_err(|message| (message, GENERIC_ERROR_CODE));
}

let (static_response, should_import) =
Expand Down Expand Up @@ -320,11 +327,15 @@ pub async fn handle_rpc<T: EthSpec>(

// Canned responses set by block hash take priority.
if let Some(status) = ctx.get_fcu_payload_status(&head_block_hash) {
let response = JsonForkchoiceUpdatedV1Response {
payload_status: JsonPayloadStatusV1::from(status),
payload_id: None,
};
return Ok(serde_json::to_value(response).unwrap());
return status
.map(|status| {
let response = JsonForkchoiceUpdatedV1Response {
payload_status: JsonPayloadStatusV1::from(status),
payload_id: None,
};
serde_json::to_value(response).unwrap()
})
.map_err(|message| (message, GENERIC_ERROR_CODE));
}

let mut response = ctx
Expand Down
27 changes: 21 additions & 6 deletions beacon_node/execution_layer/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl<T: EthSpec> MockServer<T> {
hook: <_>::default(),
new_payload_statuses: <_>::default(),
fcu_payload_statuses: <_>::default(),
syncing_response: Arc::new(Mutex::new(Ok(false))),
engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)),
_phantom: PhantomData,
});
Expand Down Expand Up @@ -414,14 +415,25 @@ impl<T: EthSpec> MockServer<T> {
self.ctx
.new_payload_statuses
.lock()
.insert(block_hash, status);
.insert(block_hash, Ok(status));
}

pub fn set_fcu_payload_status(&self, block_hash: ExecutionBlockHash, status: PayloadStatusV1) {
self.ctx
.fcu_payload_statuses
.lock()
.insert(block_hash, status);
.insert(block_hash, Ok(status));
}

pub fn set_new_payload_error(&self, block_hash: ExecutionBlockHash, error: String) {
self.ctx
.new_payload_statuses
.lock()
.insert(block_hash, Err(error));
}

pub fn set_syncing_response(&self, res: Result<bool, String>) {
*self.ctx.syncing_response.lock() = res;
}
}

Expand Down Expand Up @@ -478,8 +490,11 @@ pub struct Context<T: EthSpec> {
//
// This is a more flexible and less stateful alternative to `static_new_payload_response`
// and `preloaded_responses`.
pub new_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
pub fcu_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
pub new_payload_statuses:
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
pub fcu_payload_statuses:
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
pub syncing_response: Arc<Mutex<Result<bool, String>>>,

pub engine_capabilities: Arc<RwLock<EngineCapabilities>>,
pub _phantom: PhantomData<T>,
Expand All @@ -489,14 +504,14 @@ impl<T: EthSpec> Context<T> {
pub fn get_new_payload_status(
&self,
block_hash: &ExecutionBlockHash,
) -> Option<PayloadStatusV1> {
) -> Option<Result<PayloadStatusV1, String>> {
self.new_payload_statuses.lock().get(block_hash).cloned()
}

pub fn get_fcu_payload_status(
&self,
block_hash: &ExecutionBlockHash,
) -> Option<PayloadStatusV1> {
) -> Option<Result<PayloadStatusV1, String>> {
self.fcu_payload_statuses.lock().get(block_hash).cloned()
}
}
Expand Down
48 changes: 30 additions & 18 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2285,28 +2285,40 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and_then(
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let head_slot = chain.canonical_head.cached_head().head_slot();
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error("Unable to read slot clock".into())
})?;
async move {
let el_offline = if let Some(el) = &chain.execution_layer {
el.is_offline_or_erroring().await
} else {
true
};

// Taking advantage of saturating subtraction on slot.
let sync_distance = current_slot - head_slot;
blocking_json_task(move || {
let head_slot = chain.canonical_head.cached_head().head_slot();
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Unable to read slot clock".into(),
)
})?;

let is_optimistic = chain
.is_optimistic_or_invalid_head()
.map_err(warp_utils::reject::beacon_chain_error)?;
// Taking advantage of saturating subtraction on slot.
let sync_distance = current_slot - head_slot;

let syncing_data = api_types::SyncingData {
is_syncing: network_globals.sync_state.read().is_syncing(),
is_optimistic: Some(is_optimistic),
head_slot,
sync_distance,
};
let is_optimistic = chain
.is_optimistic_or_invalid_head()
.map_err(warp_utils::reject::beacon_chain_error)?;

Ok(api_types::GenericResponse::from(syncing_data))
})
let syncing_data = api_types::SyncingData {
is_syncing: network_globals.sync_state.read().is_syncing(),
is_optimistic: Some(is_optimistic),
el_offline: Some(el_offline),
head_slot,
sync_distance,
};

Ok(api_types::GenericResponse::from(syncing_data))
})
.await
}
},
);

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

pub mod fork_tests;
pub mod interactive_tests;
pub mod status_tests;
pub mod tests;
Loading

0 comments on commit 335faff

Please sign in to comment.