Fix attestation propagation (#1360)

* Add `should_process` for conditional processing of Attestations

* Remove ATTESTATIONS_IGNORED metric
This commit is contained in:
Pawan Dhananjay 2020-07-20 08:25:32 +05:30 committed by GitHub
parent fc5e6cbbb0
commit b885d79ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 26 deletions

View File

@ -271,7 +271,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false.
pub fn should_process_attestation(
&mut self,
&self,
subnet: SubnetId,
attestation: &Attestation<T::EthSpec>,
) -> bool {

View File

@ -12,10 +12,6 @@ lazy_static! {
"network_gossip_unaggregated_attestations_rx_total",
"Count of gossip unaggregated attestations received"
);
pub static ref GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED: Result<IntCounter> = try_create_int_counter(
"network_gossip_unaggregated_attestations_ignored_total",
"Count of gossip unaggregated attestations ignored by attestation service"
);
pub static ref GOSSIP_AGGREGATED_ATTESTATIONS_RX: Result<IntCounter> = try_create_int_counter(
"network_gossip_aggregated_attestations_rx_total",
"Count of gossip aggregated attestations received"

View File

@ -63,8 +63,9 @@ pub enum RouterMessage<T: EthSpec> {
error: RPCError,
},
/// A gossip message has been received. The fields are: message id, the peer that sent us this
/// message and the message itself.
PubsubMessage(MessageId, PeerId, PubsubMessage<T>),
/// message, the message itself and a bool which indicates if the message should be processed
/// by the beacon chain after successful verification.
PubsubMessage(MessageId, PeerId, PubsubMessage<T>, bool),
/// The peer manager has requested we re-status a peer.
StatusPeer(PeerId),
}
@ -152,8 +153,8 @@ impl<T: BeaconChainTypes> Router<T> {
"client" => self.network_globals.client(&peer_id).to_string());
self.processor.on_rpc_error(peer_id, request_id);
}
RouterMessage::PubsubMessage(id, peer_id, gossip) => {
self.handle_gossip(id, peer_id, gossip);
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
self.handle_gossip(id, peer_id, gossip, should_process);
}
}
}
@ -200,12 +201,16 @@ impl<T: BeaconChainTypes> Router<T> {
}
}
/// Handle RPC messages
/// Handle RPC messages.
/// Note: `should_process` is currently only useful for the `Attestation` variant.
/// if `should_process` is `false`, we only propagate the message on successful verification,
/// else, we propagate **and** import into the beacon chain.
fn handle_gossip(
&mut self,
id: MessageId,
peer_id: PeerId,
gossip_message: PubsubMessage<T::EthSpec>,
should_process: bool,
) {
match gossip_message {
// Attestations should never reach the router.
@ -228,8 +233,10 @@ impl<T: BeaconChainTypes> Router<T> {
)
{
self.propagate_message(id, peer_id.clone());
self.processor
.import_unaggregated_attestation(peer_id, gossip_verified);
if should_process {
self.processor
.import_unaggregated_attestation(peer_id, gossip_verified);
}
}
}
PubsubMessage::BeaconBlock(block) => {

View File

@ -344,27 +344,24 @@ fn spawn_service<T: BeaconChainTypes>(
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we process
// the attestation
if service.attestation_service.should_process_attestation(
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = service.attestation_service.should_process_attestation(
subnet,
attestation,
) {
let _ = service
.router_send
.send(RouterMessage::PubsubMessage(id, source, message))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
});
} else {
metrics::inc_counter(&metrics::GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED)
}
);
let _ = service
.router_send
.send(RouterMessage::PubsubMessage(id, source, message, should_process))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
});
}
_ => {
// all else is sent to the router
let _ = service
.router_send
.send(RouterMessage::PubsubMessage(id, source, message))
.send(RouterMessage::PubsubMessage(id, source, message, true))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
});