Validator registration request failures do not cause us to mark BNs offline (#3488)

## Issue Addressed

Relates to https://github.com/sigp/lighthouse/issues/3416

## Proposed Changes

- Add an `OfflineOnFailure` enum to the `first_success` method for querying beacon nodes so that a val registration request failure from the BN -> builder does not result in the BN being marked offline. This seems important because these failures could be coming directly from a connected relay and actually have no bearing on BN health.  Other messages that are sent to a relay have a local fallback so shouldn't result in errors 

- Downgrade the following log to a `WARN`

```
ERRO Unable to publish validator registrations to the builder network, error: All endpoints failed https://BN_B => RequestFailed(ServerMessage(ErrorMessage { code: 500, message: "UNHANDLED_ERROR: BuilderMissing", stacktraces: [] })), https://XXXX/ => Unavailable(Offline), [omitted]
```

## Additional Info

I think this change at least improves the UX of having a VC connected to some builder and some non-builder beacon nodes. I think we need to balance potentially alerting users that there is a BN <> VC misconfiguration and also allowing this type of fallback to work. 

If we want to fully support this type of configuration we may want to consider adding a flag `--builder-beacon-nodes` and track whether a VC should be making builder queries on a per-beacon node basis.  But I think the changes in this PR are independent of that type of extension.

PS: Sorry for the big diff here, it's mostly formatting changes after I added a new arg to a bunch of methods calls.




Co-authored-by: realbigsean <sean@sigmaprime.io>
This commit is contained in:
realbigsean 2022-08-29 11:35:59 +00:00
parent 66eca1a882
commit 2ce86a0830
9 changed files with 339 additions and 241 deletions

View File

@ -3,6 +3,7 @@ use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
validator_store::ValidatorStore,
OfflineOnFailure,
};
use environment::RuntimeContext;
use futures::future::join_all;
@ -337,17 +338,21 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let attestation_data = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
},
)
.await
.map_err(|e| e.to_string())?;
@ -414,15 +419,19 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::ATTESTATIONS_HTTP_POST],
);
beacon_node
.post_beacon_pool_attestations(attestations)
.await
},
)
.await
{
Ok(()) => info!(
@ -470,21 +479,27 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let aggregated_attestation = &self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_GET],
);
beacon_node
.get_validator_aggregate_attestation(
attestation_data.slot,
attestation_data.tree_hash_root(),
)
.await
.map_err(|e| {
format!("Failed to produce an aggregate attestation: {:?}", e)
})?
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))
.map(|result| result.data)
},
)
.await
.map_err(|e| e.to_string())?;
@ -535,15 +550,19 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice();
match self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,
&[metrics::AGGREGATES_HTTP_POST],
);
beacon_node
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice)
.await
},
)
.await
{
Ok(()) => {

View File

@ -70,6 +70,13 @@ pub enum RequireSynced {
No,
}
/// Indicates if a beacon node should be set to `Offline` if a request fails.
#[derive(PartialEq, Clone, Copy)]
pub enum OfflineOnFailure {
Yes,
No,
}
impl PartialEq<bool> for RequireSynced {
fn eq(&self, other: &bool) -> bool {
if *other {
@ -387,6 +394,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub async fn first_success<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
func: F,
) -> Result<O, AllErrored<Err>>
where
@ -415,7 +423,9 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
$candidate.set_offline().await;
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
}

View File

@ -2,6 +2,7 @@ use crate::beacon_node_fallback::{AllErrored, Error as FallbackError};
use crate::{
beacon_node_fallback::{BeaconNodeFallback, RequireSynced},
graffiti_file::GraffitiFile,
OfflineOnFailure,
};
use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext;
@ -329,70 +330,74 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Request block from first responsive beacon node.
let block = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let block = match Payload::block_type() {
BlockType::Full => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
BlockType::Blinded => {
let _get_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_GET],
);
beacon_node
.get_validator_blinded_blocks::<E, Payload>(
slot,
randao_reveal_ref,
graffiti.as_ref(),
)
.await
.map_err(|e| {
BlockError::Recoverable(format!(
"Error from beacon node when producing block: {:?}",
e
))
})?
.data
}
};
// Ensure the correctness of the execution payload's fee recipient.
if strict_fee_recipient {
if let Ok(execution_payload) = block.body().execution_payload() {
if Some(execution_payload.fee_recipient()) != fee_recipient {
return Err(BlockError::Recoverable(
"Incorrect fee recipient used by builder".to_string(),
));
// Ensure the correctness of the execution payload's fee recipient.
if strict_fee_recipient {
if let Ok(execution_payload) = block.body().execution_payload() {
if Some(execution_payload.fee_recipient()) != fee_recipient {
return Err(BlockError::Recoverable(
"Incorrect fee recipient used by builder".to_string(),
));
}
}
}
}
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
if proposer_index != Some(block.proposer_index()) {
return Err(BlockError::Recoverable(
"Proposer index does not match block proposer. Beacon chain re-orged"
.to_string(),
));
}
Ok::<_, BlockError>(block)
})
Ok::<_, BlockError>(block)
},
)
.await?;
let signed_block = self_ref
@ -403,41 +408,45 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Publish block with first available beacon node.
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
match Payload::block_type() {
BlockType::Full => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
}
BlockType::Blinded => {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BLINDED_BEACON_BLOCK_HTTP_POST],
);
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.map_err(|e| {
BlockError::Irrecoverable(format!(
"Error from beacon node when publishing block: {:?}",
e
))
})?
}
}
Ok::<_, BlockError>(())
})
Ok::<_, BlockError>(())
},
)
.await?;
info!(

View File

@ -31,6 +31,7 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::validator_store::ValidatorStore;
use crate::OfflineOnFailure;
use environment::RuntimeContext;
use eth2::types::LivenessResponseData;
use parking_lot::RwLock;
@ -176,13 +177,17 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
} else {
// Request the previous epoch liveness state from the beacon node.
beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node
.post_lighthouse_liveness(validator_indices, previous_epoch)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| result.data)
})
.first_success(
RequireSynced::Yes,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_lighthouse_liveness(validator_indices, previous_epoch)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| result.data)
},
)
.await
.unwrap_or_else(|e| {
crit!(
@ -199,13 +204,17 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
// Request the current epoch liveness state from the beacon node.
let current_epoch_responses = beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node
.post_lighthouse_liveness(validator_indices, current_epoch)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| result.data)
})
.first_success(
RequireSynced::Yes,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_lighthouse_liveness(validator_indices, current_epoch)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| result.data)
},
)
.await
.unwrap_or_else(|e| {
crit!(

View File

@ -8,7 +8,7 @@
mod sync;
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::{
block_service::BlockServiceNotification,
http_metrics::metrics,
@ -382,18 +382,22 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
// Query the remote BN to resolve a pubkey to a validator index.
let download_result = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_ID_HTTP_GET],
);
beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey),
)
.await
})
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::VALIDATOR_ID_HTTP_GET],
);
beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey),
)
.await
},
)
.await;
match download_result {
@ -559,15 +563,19 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_ref = &subscriptions;
if let Err(e) = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::SUBSCRIPTIONS_HTTP_POST],
);
beacon_node
.post_validator_beacon_committee_subscriptions(subscriptions_ref)
.await
})
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::SUBSCRIPTIONS_HTTP_POST],
);
beacon_node
.post_validator_beacon_committee_subscriptions(subscriptions_ref)
.await
},
)
.await
{
error!(
@ -619,15 +627,19 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
let response = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, local_indices)
.await
})
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, local_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?;
@ -779,15 +791,19 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
if !local_pubkeys.is_empty() {
let download_result = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
})
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
},
)
.await;
match download_result {

View File

@ -1,3 +1,4 @@
use crate::beacon_node_fallback::OfflineOnFailure;
use crate::{
doppelganger_service::DoppelgangerStatus,
duties_service::{DutiesService, Error},
@ -420,11 +421,15 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
let duties_response = duties_service
.beacon_nodes
.first_success(duties_service.require_synced, |beacon_node| async move {
beacon_node
.post_validator_duties_sync(period_start_epoch, local_indices)
.await
})
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_validator_duties_sync(period_start_epoch, local_indices)
.await
},
)
.await;
let duties = match duties_response {

View File

@ -26,7 +26,8 @@ use monitoring_api::{MonitoringHttpClient, ProcessType};
pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
use crate::beacon_node_fallback::{
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced,
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, OfflineOnFailure,
RequireSynced,
};
use crate::doppelganger_service::DoppelgangerService;
use account_utils::validator_definitions::ValidatorDefinitions;
@ -570,9 +571,11 @@ async fn init_from_beacon_node<E: EthSpec>(
let genesis = loop {
match beacon_nodes
.first_success(RequireSynced::No, |node| async move {
node.get_beacon_genesis().await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|node| async move { node.get_beacon_genesis().await },
)
.await
{
Ok(genesis) => break genesis.data,
@ -659,9 +662,11 @@ async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
) -> Result<(), String> {
loop {
match beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node.get_lighthouse_staking().await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move { beacon_node.get_lighthouse_staking().await },
)
.await
{
Ok(is_staking) => {

View File

@ -1,9 +1,10 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::validator_store::{DoppelgangerStatus, ValidatorStore};
use crate::OfflineOnFailure;
use bls::PublicKeyBytes;
use environment::RuntimeContext;
use parking_lot::RwLock;
use slog::{debug, error, info};
use slog::{debug, error, info, warn};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::hash::Hash;
@ -330,11 +331,15 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let preparation_entries = preparation_data.as_slice();
match self
.beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await
})
.first_success(
RequireSynced::Yes,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await
},
)
.await
{
Ok(()) => debug!(
@ -445,9 +450,13 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
for batch in signed.chunks(VALIDATOR_REGISTRATION_BATCH_SIZE) {
match self
.beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
beacon_node.post_validator_register_validator(batch).await
})
.first_success(
RequireSynced::Yes,
OfflineOnFailure::No,
|beacon_node| async move {
beacon_node.post_validator_register_validator(batch).await
},
)
.await
{
Ok(()) => info!(
@ -455,7 +464,7 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
"Published validator registrations to the builder network";
"count" => registration_data_len,
),
Err(e) => error!(
Err(e) => warn!(
log,
"Unable to publish validator registrations to the builder network";
"error" => %e,

View File

@ -1,5 +1,5 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore};
use crate::{duties_service::DutiesService, validator_store::ValidatorStore, OfflineOnFailure};
use environment::RuntimeContext;
use eth2::types::BlockId;
use futures::future::join_all;
@ -177,7 +177,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Fetch `block_root` and `execution_optimistic` for `SyncCommitteeContribution`.
let response = self
.beacon_nodes
.first_success(RequireSynced::Yes, |beacon_node| async move {
.first_success(RequireSynced::Yes, OfflineOnFailure::Yes,|beacon_node| async move {
beacon_node.get_beacon_blocks_root(BlockId::Head).await
})
.await
@ -284,11 +284,15 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.collect::<Vec<_>>();
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
.await
},
)
.await
.map_err(|e| {
error!(
@ -351,17 +355,21 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let contribution = &self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
let sync_contribution_data = SyncContributionData {
slot,
beacon_block_root,
subcommittee_index: subnet_id.into(),
};
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
let sync_contribution_data = SyncContributionData {
slot,
beacon_block_root,
subcommittee_index: subnet_id.into(),
};
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.await
})
beacon_node
.get_validator_sync_committee_contribution::<E>(&sync_contribution_data)
.await
},
)
.await
.map_err(|e| {
crit!(
@ -418,11 +426,15 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
// Publish to the beacon node.
self.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_contribution_and_proofs(signed_contributions)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_validator_contribution_and_proofs(signed_contributions)
.await
},
)
.await
.map_err(|e| {
error!(
@ -556,11 +568,15 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
if let Err(e) = self
.beacon_nodes
.first_success(RequireSynced::No, |beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await
})
.first_success(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)
.await
},
)
.await
{
error!(