Implement el_offline
and use it in the VC (#4295)
## Issue Addressed
Closes https://github.com/sigp/lighthouse/issues/4291, part of #3613.
## Proposed Changes
- Implement the `el_offline` field on `/eth/v1/node/syncing`. We set `el_offline=true` if:
- The EL's internal status is `Offline` or `AuthFailed`, _or_
- The most recent call to `newPayload` resulted in an error (more on this in a moment).
- Use the `el_offline` field in the VC to mark nodes with offline ELs as _unsynced_. These nodes will still be used, but only after synced nodes.
- Overhaul the usage of `RequireSynced` so that `::No` is used almost everywhere. The `--allow-unsynced` flag was broken and had the opposite effect to intended, so it has been deprecated.
- Add tests for the EL being offline on the upcheck call, and being offline due to the newPayload check.
## Why track `newPayload` errors?
Tracking the EL's online/offline status is too coarse-grained to be useful in practice, because:
- If the EL is timing out to some calls, it's unlikely to timeout on the `upcheck` call, which is _just_ `eth_syncing`. Every failed call is followed by an upcheck [here](693886b941/beacon_node/execution_layer/src/engines.rs (L372-L380)
), which would have the effect of masking the failure and keeping the status _online_.
- The `newPayload` call is the most likely to time out. It's the call in which ELs tend to do most of their work (often 1-2 seconds), with `forkchoiceUpdated` usually returning much faster (<50ms).
- If `newPayload` is failing consistently (e.g. timing out) then this is a good indication that either the node's EL is in trouble, or the network as a whole is. In the first case validator clients _should_ prefer other BNs if they have one available. In the second case, all of their BNs will likely report `el_offline` and they'll just have to proceed with trying to use them.
## Additional Changes
- Add utility method `ForkName::latest` which is quite convenient for test writing, but probably other things too.
- Delete some stale comments from when we used to support multiple execution nodes.
This commit is contained in:
parent
aaa118ff0e
commit
3052db29fe
@ -910,6 +910,9 @@ async fn invalid_after_optimistic_sync() {
|
|||||||
.await,
|
.await,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// EL status should still be online, no errors.
|
||||||
|
assert!(!rig.execution_layer().is_offline_or_erroring().await);
|
||||||
|
|
||||||
// Running fork choice is necessary since a block has been invalidated.
|
// Running fork choice is necessary since a block has been invalidated.
|
||||||
rig.recompute_head().await;
|
rig.recompute_head().await;
|
||||||
|
|
||||||
|
@ -238,6 +238,11 @@ impl Engine {
|
|||||||
**self.state.read().await == EngineStateInternal::Synced
|
**self.state.read().await == EngineStateInternal::Synced
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the engine has a status other than synced or syncing.
|
||||||
|
pub async fn is_offline(&self) -> bool {
|
||||||
|
EngineState::from(**self.state.read().await) == EngineState::Offline
|
||||||
|
}
|
||||||
|
|
||||||
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
|
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
|
||||||
/// might be used to recover the node if offline.
|
/// might be used to recover the node if offline.
|
||||||
pub async fn upcheck(&self) {
|
pub async fn upcheck(&self) {
|
||||||
|
@ -222,6 +222,11 @@ struct Inner<E: EthSpec> {
|
|||||||
builder_profit_threshold: Uint256,
|
builder_profit_threshold: Uint256,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
always_prefer_builder_payload: bool,
|
always_prefer_builder_payload: bool,
|
||||||
|
/// Track whether the last `newPayload` call errored.
|
||||||
|
///
|
||||||
|
/// This is used *only* in the informational sync status endpoint, so that a VC using this
|
||||||
|
/// node can prefer another node with a healthier EL.
|
||||||
|
last_new_payload_errored: RwLock<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
@ -350,6 +355,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
|
|||||||
builder_profit_threshold: Uint256::from(builder_profit_threshold),
|
builder_profit_threshold: Uint256::from(builder_profit_threshold),
|
||||||
log,
|
log,
|
||||||
always_prefer_builder_payload,
|
always_prefer_builder_payload,
|
||||||
|
last_new_payload_errored: RwLock::new(false),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@ -542,6 +548,15 @@ impl<T: EthSpec> ExecutionLayer<T> {
|
|||||||
synced
|
synced
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return `true` if the execution layer is offline or returning errors on `newPayload`.
|
||||||
|
///
|
||||||
|
/// This function should never be used to prevent any operation in the beacon node, but can
|
||||||
|
/// be used to give an indication on the HTTP API that the node's execution layer is struggling,
|
||||||
|
/// which can in turn be used by the VC.
|
||||||
|
pub async fn is_offline_or_erroring(&self) -> bool {
|
||||||
|
self.engine().is_offline().await || *self.inner.last_new_payload_errored.read().await
|
||||||
|
}
|
||||||
|
|
||||||
/// Updates the proposer preparation data provided by validators
|
/// Updates the proposer preparation data provided by validators
|
||||||
pub async fn update_proposer_preparation(
|
pub async fn update_proposer_preparation(
|
||||||
&self,
|
&self,
|
||||||
@ -1116,18 +1131,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Maps to the `engine_newPayload` JSON-RPC call.
|
/// Maps to the `engine_newPayload` JSON-RPC call.
|
||||||
///
|
|
||||||
/// ## Fallback Behaviour
|
|
||||||
///
|
|
||||||
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
|
|
||||||
/// failure) from all nodes and then return based on the first of these conditions which
|
|
||||||
/// returns true:
|
|
||||||
///
|
|
||||||
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
|
|
||||||
/// - Valid, if any nodes return valid.
|
|
||||||
/// - Invalid, if any nodes return invalid.
|
|
||||||
/// - Syncing, if any nodes return syncing.
|
|
||||||
/// - An error, if all nodes return an error.
|
|
||||||
pub async fn notify_new_payload(
|
pub async fn notify_new_payload(
|
||||||
&self,
|
&self,
|
||||||
execution_payload: &ExecutionPayload<T>,
|
execution_payload: &ExecutionPayload<T>,
|
||||||
@ -1156,12 +1159,18 @@ impl<T: EthSpec> ExecutionLayer<T> {
|
|||||||
&["new_payload", status.status.into()],
|
&["new_payload", status.status.into()],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
*self.inner.last_new_payload_errored.write().await = result.is_err();
|
||||||
|
|
||||||
process_payload_status(execution_payload.block_hash(), result, self.log())
|
process_payload_status(execution_payload.block_hash(), result, self.log())
|
||||||
.map_err(Box::new)
|
.map_err(Box::new)
|
||||||
.map_err(Error::EngineError)
|
.map_err(Error::EngineError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update engine sync status.
|
||||||
|
pub async fn upcheck(&self) {
|
||||||
|
self.engine().upcheck().await;
|
||||||
|
}
|
||||||
|
|
||||||
/// Register that the given `validator_index` is going to produce a block at `slot`.
|
/// Register that the given `validator_index` is going to produce a block at `slot`.
|
||||||
///
|
///
|
||||||
/// The block will be built atop `head_block_root` and the EL will need to prepare an
|
/// The block will be built atop `head_block_root` and the EL will need to prepare an
|
||||||
@ -1221,18 +1230,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Maps to the `engine_consensusValidated` JSON-RPC call.
|
/// Maps to the `engine_consensusValidated` JSON-RPC call.
|
||||||
///
|
|
||||||
/// ## Fallback Behaviour
|
|
||||||
///
|
|
||||||
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
|
|
||||||
/// failure) from all nodes and then return based on the first of these conditions which
|
|
||||||
/// returns true:
|
|
||||||
///
|
|
||||||
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
|
|
||||||
/// - Valid, if any nodes return valid.
|
|
||||||
/// - Invalid, if any nodes return invalid.
|
|
||||||
/// - Syncing, if any nodes return syncing.
|
|
||||||
/// - An error, if all nodes return an error.
|
|
||||||
pub async fn notify_forkchoice_updated(
|
pub async fn notify_forkchoice_updated(
|
||||||
&self,
|
&self,
|
||||||
head_block_hash: ExecutionBlockHash,
|
head_block_hash: ExecutionBlockHash,
|
||||||
|
@ -30,7 +30,12 @@ pub async fn handle_rpc<T: EthSpec>(
|
|||||||
.map_err(|s| (s, GENERIC_ERROR_CODE))?;
|
.map_err(|s| (s, GENERIC_ERROR_CODE))?;
|
||||||
|
|
||||||
match method {
|
match method {
|
||||||
ETH_SYNCING => Ok(JsonValue::Bool(false)),
|
ETH_SYNCING => ctx
|
||||||
|
.syncing_response
|
||||||
|
.lock()
|
||||||
|
.clone()
|
||||||
|
.map(JsonValue::Bool)
|
||||||
|
.map_err(|message| (message, GENERIC_ERROR_CODE)),
|
||||||
ETH_GET_BLOCK_BY_NUMBER => {
|
ETH_GET_BLOCK_BY_NUMBER => {
|
||||||
let tag = params
|
let tag = params
|
||||||
.get(0)
|
.get(0)
|
||||||
@ -145,7 +150,9 @@ pub async fn handle_rpc<T: EthSpec>(
|
|||||||
|
|
||||||
// Canned responses set by block hash take priority.
|
// Canned responses set by block hash take priority.
|
||||||
if let Some(status) = ctx.get_new_payload_status(request.block_hash()) {
|
if let Some(status) = ctx.get_new_payload_status(request.block_hash()) {
|
||||||
return Ok(serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap());
|
return status
|
||||||
|
.map(|status| serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap())
|
||||||
|
.map_err(|message| (message, GENERIC_ERROR_CODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (static_response, should_import) =
|
let (static_response, should_import) =
|
||||||
@ -320,11 +327,15 @@ pub async fn handle_rpc<T: EthSpec>(
|
|||||||
|
|
||||||
// Canned responses set by block hash take priority.
|
// Canned responses set by block hash take priority.
|
||||||
if let Some(status) = ctx.get_fcu_payload_status(&head_block_hash) {
|
if let Some(status) = ctx.get_fcu_payload_status(&head_block_hash) {
|
||||||
let response = JsonForkchoiceUpdatedV1Response {
|
return status
|
||||||
payload_status: JsonPayloadStatusV1::from(status),
|
.map(|status| {
|
||||||
payload_id: None,
|
let response = JsonForkchoiceUpdatedV1Response {
|
||||||
};
|
payload_status: JsonPayloadStatusV1::from(status),
|
||||||
return Ok(serde_json::to_value(response).unwrap());
|
payload_id: None,
|
||||||
|
};
|
||||||
|
serde_json::to_value(response).unwrap()
|
||||||
|
})
|
||||||
|
.map_err(|message| (message, GENERIC_ERROR_CODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut response = ctx
|
let mut response = ctx
|
||||||
|
@ -126,6 +126,7 @@ impl<T: EthSpec> MockServer<T> {
|
|||||||
hook: <_>::default(),
|
hook: <_>::default(),
|
||||||
new_payload_statuses: <_>::default(),
|
new_payload_statuses: <_>::default(),
|
||||||
fcu_payload_statuses: <_>::default(),
|
fcu_payload_statuses: <_>::default(),
|
||||||
|
syncing_response: Arc::new(Mutex::new(Ok(false))),
|
||||||
engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)),
|
engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)),
|
||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
});
|
});
|
||||||
@ -414,14 +415,25 @@ impl<T: EthSpec> MockServer<T> {
|
|||||||
self.ctx
|
self.ctx
|
||||||
.new_payload_statuses
|
.new_payload_statuses
|
||||||
.lock()
|
.lock()
|
||||||
.insert(block_hash, status);
|
.insert(block_hash, Ok(status));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_fcu_payload_status(&self, block_hash: ExecutionBlockHash, status: PayloadStatusV1) {
|
pub fn set_fcu_payload_status(&self, block_hash: ExecutionBlockHash, status: PayloadStatusV1) {
|
||||||
self.ctx
|
self.ctx
|
||||||
.fcu_payload_statuses
|
.fcu_payload_statuses
|
||||||
.lock()
|
.lock()
|
||||||
.insert(block_hash, status);
|
.insert(block_hash, Ok(status));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_new_payload_error(&self, block_hash: ExecutionBlockHash, error: String) {
|
||||||
|
self.ctx
|
||||||
|
.new_payload_statuses
|
||||||
|
.lock()
|
||||||
|
.insert(block_hash, Err(error));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_syncing_response(&self, res: Result<bool, String>) {
|
||||||
|
*self.ctx.syncing_response.lock() = res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,8 +490,11 @@ pub struct Context<T: EthSpec> {
|
|||||||
//
|
//
|
||||||
// This is a more flexible and less stateful alternative to `static_new_payload_response`
|
// This is a more flexible and less stateful alternative to `static_new_payload_response`
|
||||||
// and `preloaded_responses`.
|
// and `preloaded_responses`.
|
||||||
pub new_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
|
pub new_payload_statuses:
|
||||||
pub fcu_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
|
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
|
||||||
|
pub fcu_payload_statuses:
|
||||||
|
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
|
||||||
|
pub syncing_response: Arc<Mutex<Result<bool, String>>>,
|
||||||
|
|
||||||
pub engine_capabilities: Arc<RwLock<EngineCapabilities>>,
|
pub engine_capabilities: Arc<RwLock<EngineCapabilities>>,
|
||||||
pub _phantom: PhantomData<T>,
|
pub _phantom: PhantomData<T>,
|
||||||
@ -489,14 +504,14 @@ impl<T: EthSpec> Context<T> {
|
|||||||
pub fn get_new_payload_status(
|
pub fn get_new_payload_status(
|
||||||
&self,
|
&self,
|
||||||
block_hash: &ExecutionBlockHash,
|
block_hash: &ExecutionBlockHash,
|
||||||
) -> Option<PayloadStatusV1> {
|
) -> Option<Result<PayloadStatusV1, String>> {
|
||||||
self.new_payload_statuses.lock().get(block_hash).cloned()
|
self.new_payload_statuses.lock().get(block_hash).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_fcu_payload_status(
|
pub fn get_fcu_payload_status(
|
||||||
&self,
|
&self,
|
||||||
block_hash: &ExecutionBlockHash,
|
block_hash: &ExecutionBlockHash,
|
||||||
) -> Option<PayloadStatusV1> {
|
) -> Option<Result<PayloadStatusV1, String>> {
|
||||||
self.fcu_payload_statuses.lock().get(block_hash).cloned()
|
self.fcu_payload_statuses.lock().get(block_hash).cloned()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2285,28 +2285,40 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(chain_filter.clone())
|
.and(chain_filter.clone())
|
||||||
.and_then(
|
.and_then(
|
||||||
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| {
|
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| {
|
||||||
blocking_json_task(move || {
|
async move {
|
||||||
let head_slot = chain.canonical_head.cached_head().head_slot();
|
let el_offline = if let Some(el) = &chain.execution_layer {
|
||||||
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
|
el.is_offline_or_erroring().await
|
||||||
warp_utils::reject::custom_server_error("Unable to read slot clock".into())
|
} else {
|
||||||
})?;
|
true
|
||||||
|
|
||||||
// Taking advantage of saturating subtraction on slot.
|
|
||||||
let sync_distance = current_slot - head_slot;
|
|
||||||
|
|
||||||
let is_optimistic = chain
|
|
||||||
.is_optimistic_or_invalid_head()
|
|
||||||
.map_err(warp_utils::reject::beacon_chain_error)?;
|
|
||||||
|
|
||||||
let syncing_data = api_types::SyncingData {
|
|
||||||
is_syncing: network_globals.sync_state.read().is_syncing(),
|
|
||||||
is_optimistic: Some(is_optimistic),
|
|
||||||
head_slot,
|
|
||||||
sync_distance,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(api_types::GenericResponse::from(syncing_data))
|
blocking_json_task(move || {
|
||||||
})
|
let head_slot = chain.canonical_head.cached_head().head_slot();
|
||||||
|
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
|
||||||
|
warp_utils::reject::custom_server_error(
|
||||||
|
"Unable to read slot clock".into(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Taking advantage of saturating subtraction on slot.
|
||||||
|
let sync_distance = current_slot - head_slot;
|
||||||
|
|
||||||
|
let is_optimistic = chain
|
||||||
|
.is_optimistic_or_invalid_head()
|
||||||
|
.map_err(warp_utils::reject::beacon_chain_error)?;
|
||||||
|
|
||||||
|
let syncing_data = api_types::SyncingData {
|
||||||
|
is_syncing: network_globals.sync_state.read().is_syncing(),
|
||||||
|
is_optimistic: Some(is_optimistic),
|
||||||
|
el_offline: Some(el_offline),
|
||||||
|
head_slot,
|
||||||
|
sync_distance,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(api_types::GenericResponse::from(syncing_data))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -2,4 +2,5 @@
|
|||||||
|
|
||||||
pub mod fork_tests;
|
pub mod fork_tests;
|
||||||
pub mod interactive_tests;
|
pub mod interactive_tests;
|
||||||
|
pub mod status_tests;
|
||||||
pub mod tests;
|
pub mod tests;
|
||||||
|
145
beacon_node/http_api/tests/status_tests.rs
Normal file
145
beacon_node/http_api/tests/status_tests.rs
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
//! Tests related to the beacon node's sync status
|
||||||
|
use beacon_chain::{
|
||||||
|
test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy},
|
||||||
|
BlockError,
|
||||||
|
};
|
||||||
|
use execution_layer::{PayloadStatusV1, PayloadStatusV1Status};
|
||||||
|
use http_api::test_utils::InteractiveTester;
|
||||||
|
use types::{EthSpec, ExecPayload, ForkName, MinimalEthSpec, Slot};
|
||||||
|
|
||||||
|
type E = MinimalEthSpec;
|
||||||
|
|
||||||
|
/// Create a new test environment that is post-merge with `chain_depth` blocks.
|
||||||
|
async fn post_merge_tester(chain_depth: u64, validator_count: u64) -> InteractiveTester<E> {
|
||||||
|
// Test using latest fork so that we simulate conditions as similar to mainnet as possible.
|
||||||
|
let mut spec = ForkName::latest().make_genesis_spec(E::default_spec());
|
||||||
|
spec.terminal_total_difficulty = 1.into();
|
||||||
|
|
||||||
|
let tester = InteractiveTester::<E>::new(Some(spec), validator_count as usize).await;
|
||||||
|
let harness = &tester.harness;
|
||||||
|
let mock_el = harness.mock_execution_layer.as_ref().unwrap();
|
||||||
|
let execution_ctx = mock_el.server.ctx.clone();
|
||||||
|
|
||||||
|
// Move to terminal block.
|
||||||
|
mock_el.server.all_payloads_valid();
|
||||||
|
execution_ctx
|
||||||
|
.execution_block_generator
|
||||||
|
.write()
|
||||||
|
.move_to_terminal_block()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Create some chain depth.
|
||||||
|
harness.advance_slot();
|
||||||
|
harness
|
||||||
|
.extend_chain_with_sync(
|
||||||
|
chain_depth as usize,
|
||||||
|
BlockStrategy::OnCanonicalHead,
|
||||||
|
AttestationStrategy::AllValidators,
|
||||||
|
SyncCommitteeStrategy::AllValidators,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
tester
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check `syncing` endpoint when the EL is syncing.
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn el_syncing_then_synced() {
|
||||||
|
let num_blocks = E::slots_per_epoch() / 2;
|
||||||
|
let num_validators = E::slots_per_epoch();
|
||||||
|
let tester = post_merge_tester(num_blocks, num_validators).await;
|
||||||
|
let harness = &tester.harness;
|
||||||
|
let mock_el = harness.mock_execution_layer.as_ref().unwrap();
|
||||||
|
|
||||||
|
// EL syncing
|
||||||
|
mock_el.server.set_syncing_response(Ok(true));
|
||||||
|
mock_el.el.upcheck().await;
|
||||||
|
|
||||||
|
let api_response = tester.client.get_node_syncing().await.unwrap().data;
|
||||||
|
assert_eq!(api_response.el_offline, Some(false));
|
||||||
|
assert_eq!(api_response.is_optimistic, Some(false));
|
||||||
|
assert_eq!(api_response.is_syncing, false);
|
||||||
|
|
||||||
|
// EL synced
|
||||||
|
mock_el.server.set_syncing_response(Ok(false));
|
||||||
|
mock_el.el.upcheck().await;
|
||||||
|
|
||||||
|
let api_response = tester.client.get_node_syncing().await.unwrap().data;
|
||||||
|
assert_eq!(api_response.el_offline, Some(false));
|
||||||
|
assert_eq!(api_response.is_optimistic, Some(false));
|
||||||
|
assert_eq!(api_response.is_syncing, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check `syncing` endpoint when the EL is offline (errors on upcheck).
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn el_offline() {
|
||||||
|
let num_blocks = E::slots_per_epoch() / 2;
|
||||||
|
let num_validators = E::slots_per_epoch();
|
||||||
|
let tester = post_merge_tester(num_blocks, num_validators).await;
|
||||||
|
let harness = &tester.harness;
|
||||||
|
let mock_el = harness.mock_execution_layer.as_ref().unwrap();
|
||||||
|
|
||||||
|
// EL offline
|
||||||
|
mock_el.server.set_syncing_response(Err("offline".into()));
|
||||||
|
mock_el.el.upcheck().await;
|
||||||
|
|
||||||
|
let api_response = tester.client.get_node_syncing().await.unwrap().data;
|
||||||
|
assert_eq!(api_response.el_offline, Some(true));
|
||||||
|
assert_eq!(api_response.is_optimistic, Some(false));
|
||||||
|
assert_eq!(api_response.is_syncing, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check `syncing` endpoint when the EL errors on newPaylod but is not fully offline.
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn el_error_on_new_payload() {
|
||||||
|
let num_blocks = E::slots_per_epoch() / 2;
|
||||||
|
let num_validators = E::slots_per_epoch();
|
||||||
|
let tester = post_merge_tester(num_blocks, num_validators).await;
|
||||||
|
let harness = &tester.harness;
|
||||||
|
let mock_el = harness.mock_execution_layer.as_ref().unwrap();
|
||||||
|
|
||||||
|
// Make a block.
|
||||||
|
let pre_state = harness.get_current_state();
|
||||||
|
let (block, _) = harness
|
||||||
|
.make_block(pre_state, Slot::new(num_blocks + 1))
|
||||||
|
.await;
|
||||||
|
let block_hash = block
|
||||||
|
.message()
|
||||||
|
.body()
|
||||||
|
.execution_payload()
|
||||||
|
.unwrap()
|
||||||
|
.block_hash();
|
||||||
|
|
||||||
|
// Make sure `newPayload` errors for the new block.
|
||||||
|
mock_el
|
||||||
|
.server
|
||||||
|
.set_new_payload_error(block_hash, "error".into());
|
||||||
|
|
||||||
|
// Attempt to process the block, which should error.
|
||||||
|
harness.advance_slot();
|
||||||
|
assert!(matches!(
|
||||||
|
harness.process_block_result(block.clone()).await,
|
||||||
|
Err(BlockError::ExecutionPayloadError(_))
|
||||||
|
));
|
||||||
|
|
||||||
|
// The EL should now be *offline* according to the API.
|
||||||
|
let api_response = tester.client.get_node_syncing().await.unwrap().data;
|
||||||
|
assert_eq!(api_response.el_offline, Some(true));
|
||||||
|
assert_eq!(api_response.is_optimistic, Some(false));
|
||||||
|
assert_eq!(api_response.is_syncing, false);
|
||||||
|
|
||||||
|
// Processing a block successfully should remove the status.
|
||||||
|
mock_el.server.set_new_payload_status(
|
||||||
|
block_hash,
|
||||||
|
PayloadStatusV1 {
|
||||||
|
status: PayloadStatusV1Status::Valid,
|
||||||
|
latest_valid_hash: Some(block_hash),
|
||||||
|
validation_error: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
harness.process_block_result(block).await.unwrap();
|
||||||
|
|
||||||
|
let api_response = tester.client.get_node_syncing().await.unwrap().data;
|
||||||
|
assert_eq!(api_response.el_offline, Some(false));
|
||||||
|
assert_eq!(api_response.is_optimistic, Some(false));
|
||||||
|
assert_eq!(api_response.is_syncing, false);
|
||||||
|
}
|
@ -1721,6 +1721,8 @@ impl ApiTester {
|
|||||||
let expected = SyncingData {
|
let expected = SyncingData {
|
||||||
is_syncing: false,
|
is_syncing: false,
|
||||||
is_optimistic: Some(false),
|
is_optimistic: Some(false),
|
||||||
|
// these tests run without the Bellatrix fork enabled
|
||||||
|
el_offline: Some(true),
|
||||||
head_slot,
|
head_slot,
|
||||||
sync_distance,
|
sync_distance,
|
||||||
};
|
};
|
||||||
|
@ -577,6 +577,7 @@ pub struct VersionData {
|
|||||||
pub struct SyncingData {
|
pub struct SyncingData {
|
||||||
pub is_syncing: bool,
|
pub is_syncing: bool,
|
||||||
pub is_optimistic: Option<bool>,
|
pub is_optimistic: Option<bool>,
|
||||||
|
pub el_offline: Option<bool>,
|
||||||
pub head_slot: Slot,
|
pub head_slot: Slot,
|
||||||
pub sync_distance: Slot,
|
pub sync_distance: Slot,
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,11 @@ impl ForkName {
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn latest() -> ForkName {
|
||||||
|
// This unwrap is safe as long as we have 1+ forks. It is tested below.
|
||||||
|
*ForkName::list_all().last().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the activation slots in the given `ChainSpec` so that the fork named by `self`
|
/// Set the activation slots in the given `ChainSpec` so that the fork named by `self`
|
||||||
/// is the only fork in effect from genesis.
|
/// is the only fork in effect from genesis.
|
||||||
pub fn make_genesis_spec(&self, mut spec: ChainSpec) -> ChainSpec {
|
pub fn make_genesis_spec(&self, mut spec: ChainSpec) -> ChainSpec {
|
||||||
@ -178,7 +183,7 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn previous_and_next_fork_consistent() {
|
fn previous_and_next_fork_consistent() {
|
||||||
assert_eq!(ForkName::Capella.next_fork(), None);
|
assert_eq!(ForkName::latest().next_fork(), None);
|
||||||
assert_eq!(ForkName::Base.previous_fork(), None);
|
assert_eq!(ForkName::Base.previous_fork(), None);
|
||||||
|
|
||||||
for (prev_fork, fork) in ForkName::list_all().into_iter().tuple_windows() {
|
for (prev_fork, fork) in ForkName::list_all().into_iter().tuple_windows() {
|
||||||
@ -211,4 +216,15 @@ mod test {
|
|||||||
assert_eq!(ForkName::from_str("merge"), Ok(ForkName::Merge));
|
assert_eq!(ForkName::from_str("merge"), Ok(ForkName::Merge));
|
||||||
assert_eq!(ForkName::Merge.to_string(), "bellatrix");
|
assert_eq!(ForkName::Merge.to_string(), "bellatrix");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fork_name_latest() {
|
||||||
|
assert_eq!(ForkName::latest(), *ForkName::list_all().last().unwrap());
|
||||||
|
|
||||||
|
let mut fork = ForkName::Base;
|
||||||
|
while let Some(next_fork) = fork.next_fork() {
|
||||||
|
fork = next_fork;
|
||||||
|
}
|
||||||
|
assert_eq!(ForkName::latest(), fork);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,10 +103,8 @@ fn beacon_nodes_flag() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn allow_unsynced_flag() {
|
fn allow_unsynced_flag() {
|
||||||
CommandLineTest::new()
|
// No-op, but doesn't crash.
|
||||||
.flag("allow-unsynced", None)
|
CommandLineTest::new().flag("allow-unsynced", None).run();
|
||||||
.run()
|
|
||||||
.with_config(|config| assert!(config.allow_unsynced_beacon_node));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -28,7 +28,7 @@ const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updati
|
|||||||
/// too early, we risk switching nodes between the time of publishing an attestation and publishing
|
/// too early, we risk switching nodes between the time of publishing an attestation and publishing
|
||||||
/// an aggregate; this may result in a missed aggregation. If we set this time too late, we risk not
|
/// an aggregate; this may result in a missed aggregation. If we set this time too late, we risk not
|
||||||
/// having the correct nodes up and running prior to the start of the slot.
|
/// having the correct nodes up and running prior to the start of the slot.
|
||||||
const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1);
|
const SLOT_LOOKAHEAD: Duration = Duration::from_secs(2);
|
||||||
|
|
||||||
/// Indicates a measurement of latency between the VC and a BN.
|
/// Indicates a measurement of latency between the VC and a BN.
|
||||||
pub struct LatencyMeasurement {
|
pub struct LatencyMeasurement {
|
||||||
@ -52,7 +52,7 @@ pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
|
|
||||||
let future = async move {
|
let future = async move {
|
||||||
loop {
|
loop {
|
||||||
beacon_nodes.update_unready_candidates().await;
|
beacon_nodes.update_all_candidates().await;
|
||||||
|
|
||||||
let sleep_time = beacon_nodes
|
let sleep_time = beacon_nodes
|
||||||
.slot_clock
|
.slot_clock
|
||||||
@ -385,33 +385,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
|
|||||||
n
|
n
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Loop through any `self.candidates` that we don't think are online, compatible or synced and
|
/// Loop through ALL candidates in `self.candidates` and update their sync status.
|
||||||
/// poll them to see if their status has changed.
|
|
||||||
///
|
///
|
||||||
/// We do not poll nodes that are synced to avoid sending additional requests when everything is
|
/// It is possible for a node to return an unsynced status while continuing to serve
|
||||||
/// going smoothly.
|
/// low quality responses. To route around this it's best to poll all connected beacon nodes.
|
||||||
pub async fn update_unready_candidates(&self) {
|
/// A previous implementation of this function polled only the unavailable BNs.
|
||||||
let mut futures = Vec::new();
|
pub async fn update_all_candidates(&self) {
|
||||||
for candidate in &self.candidates {
|
let futures = self
|
||||||
// There is a potential race condition between having the read lock and the write
|
.candidates
|
||||||
// lock. The worst case of this race is running `try_become_ready` twice, which is
|
.iter()
|
||||||
// acceptable.
|
.map(|candidate| {
|
||||||
//
|
candidate.refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log)
|
||||||
// Note: `RequireSynced` is always set to false here. This forces us to recheck the sync
|
})
|
||||||
// status of nodes that were previously not-synced.
|
.collect::<Vec<_>>();
|
||||||
if candidate.status(RequireSynced::Yes).await.is_err() {
|
|
||||||
// There exists a race-condition that could result in `refresh_status` being called
|
|
||||||
// when the status does not require refreshing anymore. This is deemed an
|
|
||||||
// acceptable inefficiency.
|
|
||||||
futures.push(candidate.refresh_status(
|
|
||||||
self.slot_clock.as_ref(),
|
|
||||||
&self.spec,
|
|
||||||
&self.log,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//run all updates concurrently and ignore results
|
// run all updates concurrently and ignore errors
|
||||||
let _ = future::join_all(futures).await;
|
let _ = future::join_all(futures).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,10 @@ pub async fn check_synced<T: SlotClock>(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE);
|
// Default EL status to "online" for backwards-compatibility with BNs that don't include it.
|
||||||
|
let el_offline = resp.data.el_offline.unwrap_or(false);
|
||||||
|
let bn_is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE);
|
||||||
|
let is_synced = bn_is_synced && !el_offline;
|
||||||
|
|
||||||
if let Some(log) = log_opt {
|
if let Some(log) = log_opt {
|
||||||
if !is_synced {
|
if !is_synced {
|
||||||
@ -52,6 +55,7 @@ pub async fn check_synced<T: SlotClock>(
|
|||||||
"sync_distance" => resp.data.sync_distance.as_u64(),
|
"sync_distance" => resp.data.sync_distance.as_u64(),
|
||||||
"head_slot" => resp.data.head_slot.as_u64(),
|
"head_slot" => resp.data.head_slot.as_u64(),
|
||||||
"endpoint" => %beacon_node,
|
"endpoint" => %beacon_node,
|
||||||
|
"el_offline" => el_offline,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,10 +109,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("allow-unsynced")
|
Arg::with_name("allow-unsynced")
|
||||||
.long("allow-unsynced")
|
.long("allow-unsynced")
|
||||||
.help(
|
.help("DEPRECATED: this flag does nothing"),
|
||||||
"If present, the validator client will still poll for duties if the beacon
|
|
||||||
node is not synced.",
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("use-long-timeouts")
|
Arg::with_name("use-long-timeouts")
|
||||||
|
@ -205,7 +205,13 @@ impl Config {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
config.allow_unsynced_beacon_node = cli_args.is_present("allow-unsynced");
|
if cli_args.is_present("allow-unsynced") {
|
||||||
|
warn!(
|
||||||
|
log,
|
||||||
|
"The --allow-unsynced flag is deprecated";
|
||||||
|
"msg" => "it no longer has any effect",
|
||||||
|
);
|
||||||
|
}
|
||||||
config.disable_run_on_all = cli_args.is_present("disable-run-on-all");
|
config.disable_run_on_all = cli_args.is_present("disable-run-on-all");
|
||||||
config.disable_auto_discover = cli_args.is_present("disable-auto-discover");
|
config.disable_auto_discover = cli_args.is_present("disable-auto-discover");
|
||||||
config.init_slashing_protection = cli_args.is_present("init-slashing-protection");
|
config.init_slashing_protection = cli_args.is_present("init-slashing-protection");
|
||||||
|
@ -147,11 +147,6 @@ pub struct DutiesService<T, E: EthSpec> {
|
|||||||
pub slot_clock: T,
|
pub slot_clock: T,
|
||||||
/// Provides HTTP access to remote beacon nodes.
|
/// Provides HTTP access to remote beacon nodes.
|
||||||
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
|
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
|
||||||
/// Controls whether or not this function will refuse to interact with non-synced beacon nodes.
|
|
||||||
///
|
|
||||||
/// This functionality is a little redundant since most BNs will likely reject duties when they
|
|
||||||
/// aren't synced, but we keep it around for an emergency.
|
|
||||||
pub require_synced: RequireSynced,
|
|
||||||
pub enable_high_validator_count_metrics: bool,
|
pub enable_high_validator_count_metrics: bool,
|
||||||
pub context: RuntimeContext<E>,
|
pub context: RuntimeContext<E>,
|
||||||
pub spec: ChainSpec,
|
pub spec: ChainSpec,
|
||||||
@ -421,7 +416,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
let download_result = duties_service
|
let download_result = duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
duties_service.require_synced,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
let _timer = metrics::start_timer_vec(
|
let _timer = metrics::start_timer_vec(
|
||||||
@ -618,7 +613,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
if let Err(e) = duties_service
|
if let Err(e) = duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.run(
|
.run(
|
||||||
duties_service.require_synced,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
let _timer = metrics::start_timer_vec(
|
let _timer = metrics::start_timer_vec(
|
||||||
@ -856,7 +851,7 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
duties_service
|
duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
duties_service.require_synced,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
let _timer = metrics::start_timer_vec(
|
let _timer = metrics::start_timer_vec(
|
||||||
@ -1063,7 +1058,7 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
let download_result = duties_service
|
let download_result = duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
duties_service.require_synced,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
let _timer = metrics::start_timer_vec(
|
let _timer = metrics::start_timer_vec(
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use crate::beacon_node_fallback::OfflineOnFailure;
|
use crate::beacon_node_fallback::{OfflineOnFailure, RequireSynced};
|
||||||
use crate::{
|
use crate::{
|
||||||
doppelganger_service::DoppelgangerStatus,
|
doppelganger_service::DoppelgangerStatus,
|
||||||
duties_service::{DutiesService, Error},
|
duties_service::{DutiesService, Error},
|
||||||
@ -422,7 +422,7 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
|
|||||||
let duties_response = duties_service
|
let duties_response = duties_service
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
duties_service.require_synced,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
beacon_node
|
beacon_node
|
||||||
|
@ -446,11 +446,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
|||||||
slot_clock: slot_clock.clone(),
|
slot_clock: slot_clock.clone(),
|
||||||
beacon_nodes: beacon_nodes.clone(),
|
beacon_nodes: beacon_nodes.clone(),
|
||||||
validator_store: validator_store.clone(),
|
validator_store: validator_store.clone(),
|
||||||
require_synced: if config.allow_unsynced_beacon_node {
|
|
||||||
RequireSynced::Yes
|
|
||||||
} else {
|
|
||||||
RequireSynced::No
|
|
||||||
},
|
|
||||||
spec: context.eth2_config.spec.clone(),
|
spec: context.eth2_config.spec.clone(),
|
||||||
context: duties_context,
|
context: duties_context,
|
||||||
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
|
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
|
||||||
@ -620,8 +615,8 @@ async fn init_from_beacon_node<E: EthSpec>(
|
|||||||
context: &RuntimeContext<E>,
|
context: &RuntimeContext<E>,
|
||||||
) -> Result<(u64, Hash256), String> {
|
) -> Result<(u64, Hash256), String> {
|
||||||
loop {
|
loop {
|
||||||
beacon_nodes.update_unready_candidates().await;
|
beacon_nodes.update_all_candidates().await;
|
||||||
proposer_nodes.update_unready_candidates().await;
|
proposer_nodes.update_all_candidates().await;
|
||||||
|
|
||||||
let num_available = beacon_nodes.num_available().await;
|
let num_available = beacon_nodes.num_available().await;
|
||||||
let num_total = beacon_nodes.num_total();
|
let num_total = beacon_nodes.num_total();
|
||||||
|
@ -332,7 +332,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
|
|||||||
match self
|
match self
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.run(
|
.run(
|
||||||
RequireSynced::Yes,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
beacon_node
|
beacon_node
|
||||||
@ -451,7 +451,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
|
|||||||
match self
|
match self
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
RequireSynced::Yes,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::No,
|
OfflineOnFailure::No,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
beacon_node.post_validator_register_validator(batch).await
|
beacon_node.post_validator_register_validator(batch).await
|
||||||
|
@ -178,7 +178,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
|
|||||||
let response = self
|
let response = self
|
||||||
.beacon_nodes
|
.beacon_nodes
|
||||||
.first_success(
|
.first_success(
|
||||||
RequireSynced::Yes,
|
RequireSynced::No,
|
||||||
OfflineOnFailure::Yes,
|
OfflineOnFailure::Yes,
|
||||||
|beacon_node| async move {
|
|beacon_node| async move {
|
||||||
match beacon_node.get_beacon_blocks_root(BlockId::Head).await {
|
match beacon_node.get_beacon_blocks_root(BlockId::Head).await {
|
||||||
|
Loading…
Reference in New Issue
Block a user