Age Manning 5bc8fea2e0
Activate peer scoring (#1284)
* Initial score structure

* Peer manager update

* Updates to dialing

* Correct tests

* Correct typos and remove unused function

* Integrate scoring into the network crate

* Clean warnings

* Formatting

* Shift core functionality into the behaviour

* Temp commit

* Shift disconnections into the behaviour

* Temp commit

* Update libp2p and gossipsub

* Remove gossipsub lru cache

* Correct merge conflicts

* Modify handler and correct tests

* Update enr network globals on socket update

* Apply clippy lints

* Add new prysm fingerprint

* More clippy fixes
2020-07-07 10:13:16 +10:00

439 lines
20 KiB

use crate::persisted_dht::{load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::{
attestation_service::{AttServiceMessage, AttestationService},
use crate::{error, metrics};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response,
use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use futures::prelude::*;
use rest_types::ValidatorSubscription;
use slog::{debug, error, info, o, trace, warn};
use std::sync::Arc;
use std::time::Duration;
use store::HotColdDB;
use tokio::sync::mpsc;
use tokio::time::Delay;
use types::EthSpec;
mod tests;
/// Types of messages that the network service can receive.
pub enum NetworkMessage<T: EthSpec> {
/// Subscribes a list of validators to specific slots for attestation duties.
Subscribe {
subscriptions: Vec<ValidatorSubscription>,
/// Send an RPC request to the libp2p service.
SendRequest {
peer_id: PeerId,
request: Request,
request_id: RequestId,
/// Send a successful Response to the libp2p service.
SendResponse {
peer_id: PeerId,
response: Response<T>,
id: PeerRequestId,
/// Respond to a peer's request with an error.
SendError {
// TODO: note that this is never used, we just say goodbye without nicely closing the
// stream assigned to the request
peer_id: PeerId,
error: RPCResponseErrorCode,
reason: String,
id: PeerRequestId,
/// Publish a list of messages to the gossipsub protocol.
Publish { messages: Vec<PubsubMessage<T>> },
/// Propagate a received gossipsub message.
Propagate {
propagation_source: PeerId,
message_id: MessageId,
/// Reports a peer to the peer manager for performing an action.
ReportPeer { peer_id: PeerId, action: PeerAction },
/// Disconnect an ban a peer, providing a reason.
GoodbyePeer {
peer_id: PeerId,
reason: GoodbyeReason,
/// Service that handles communication between internal services and the `eth2_libp2p` network service.
pub struct NetworkService<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
beacon_chain: Arc<BeaconChain<T>>,
/// The underlying libp2p service that drives all the network interactions.
libp2p: LibP2PService<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// The receiver channel for lighthouse to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
/// The sending channel for the network service to send messages to be routed throughout
/// lighthouse.
router_send: mpsc::UnboundedSender<RouterMessage<T::EthSpec>>,
/// A reference to lighthouse's database to persist the DHT.
store: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// A delay that expires when a new fork takes place.
next_fork_update: Option<Delay>,
/// The logger for the network service.
log: slog::Logger,
impl<T: BeaconChainTypes> NetworkService<T> {
pub fn start(
beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig,
executor: environment::TaskExecutor,
) -> error::Result<(
)> {
let network_log = executor.log().clone();
// build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage<T::EthSpec>>();
// get a reference to the beacon chain store
let store =;
// build the current enr_fork_id for adding to our local ENR
let enr_fork_id = beacon_chain.enr_fork_id();
// keep track of when our fork_id needs to be updated
let next_fork_update = next_fork_delay(&beacon_chain);
// launch libp2p service
let (network_globals, mut libp2p) =
LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?;
// Repopulate the DHT with stored ENR's.
let enrs_to_load = load_dht::<T::EthSpec, T::HotStore, T::ColdStore>(store.clone());
"Loading peers into the routing table"; "peers" => enrs_to_load.len()
for enr in enrs_to_load {
// launch derived network services
// router task
let router_send = Router::spawn(
// attestation service
let attestation_service =
AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log);
// create the network service and spawn the task
let network_log =!("service"=> "network"));
let network_service = NetworkService {
network_globals: network_globals.clone(),
log: network_log,
spawn_service(executor, network_service)?;
Ok((network_globals, network_send))
fn spawn_service<T: BeaconChainTypes>(
executor: environment::TaskExecutor,
mut service: NetworkService<T>,
) -> error::Result<()> {
let mut exit_rx = executor.exit();
// spawn on the current executor
executor.spawn_without_exit(async move {
// TODO: there is something with this code that prevents cargo fmt from doing anything at
// all. Ok, it is worse, the compiler doesn't show errors over this code beyond ast
// checking
loop {
// build the futures to check simultaneously
tokio::select! {
// handle network shutdown
_ = (&mut exit_rx) => {
// network thread is terminating
let enrs = service.libp2p.swarm.enr_entries();
"Persisting DHT to store";
"Number of peers" => format!("{}", enrs.len()),
match persist_dht::<T::EthSpec, T::HotStore, T::ColdStore>(, enrs) {
Err(e) => error!(
"Failed to persist DHT on drop";
"error" => format!("{:?}", e)
Ok(_) => info!(
"Saved DHT state";
info!(service.log, "Network service shutdown");
// handle a message sent to the network
Some(message) = service.network_recv.recv() => {
match message {
NetworkMessage::SendRequest{ peer_id, request, request_id } => {
service.libp2p.send_request(peer_id, request_id, request);
NetworkMessage::SendResponse{ peer_id, response, id } => {
service.libp2p.send_response(peer_id, id, response);
NetworkMessage::SendError{ peer_id, error, id, reason } => {
service.libp2p.respond_with_error(peer_id, id, error, reason);
NetworkMessage::Propagate {
} => {
trace!(service.log, "Propagating gossipsub message";
"propagation_peer" => format!("{:?}", propagation_source),
"message_id" => message_id.to_string(),
.propagate_message(&propagation_source, message_id);
NetworkMessage::Publish { messages } => {
let mut topic_kinds = Vec::new();
for message in &messages {
if !topic_kinds.contains(&message.kind()) {
"Sending pubsub messages";
"count" => messages.len(),
"topics" => format!("{:?}", topic_kinds)
NetworkMessage::ReportPeer { peer_id, action } => service.libp2p.report_peer(&peer_id, action),
NetworkMessage::GoodbyePeer { peer_id, reason } => service.libp2p.goodbye_peer(&peer_id, reason),
NetworkMessage::Subscribe { subscriptions } => {
if let Err(e) = service
.validator_subscriptions(subscriptions) {
warn!(service.log, "Validator subscription failed"; "error" => e);
// process any attestation service events
Some(attestation_service_message) = => {
match attestation_service_message {
// TODO: Implement
AttServiceMessage::Subscribe(subnet_id) => {
AttServiceMessage::Unsubscribe(subnet_id) => {
AttServiceMessage::EnrAdd(subnet_id) => {
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
AttServiceMessage::EnrRemove(subnet_id) => {
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => {
service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl);
libp2p_event = service.libp2p.next_event() => {
// poll the swarm
match libp2p_event {
Libp2pEvent::Behaviour(event) => match event {
BehaviourEvent::PeerDialed(peer_id) => {
let _ = service
.map_err(|_| {
debug!(service.log, "Failed to send peer dialed to router"); });
BehaviourEvent::PeerConnected(_peer_id) => {
// A peer has connected to us
// We currently do not perform any action here.
BehaviourEvent::PeerDisconnected(peer_id) => {
let _ = service
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
BehaviourEvent::RequestReceived{peer_id, id, request} => {
let _ = service
.send(RouterMessage::RPCRequestReceived{peer_id, id, request})
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
BehaviourEvent::ResponseReceived{peer_id, id, response} => {
let _ = service
.send(RouterMessage::RPCResponseReceived{ peer_id, request_id: id, response })
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
BehaviourEvent::RPCFailed{id, peer_id, error} => {
let _ = service
.send(RouterMessage::RPCFailed{ peer_id, request_id: id, error })
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
BehaviourEvent::StatusPeer(peer_id) => {
let _ = service
.map_err(|_| {
debug!(service.log, "Failed to send re-status peer to router");
BehaviourEvent::PubsubMessage {
} => {
// Update prometheus metrics.
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we process
// the attestation
if service.attestation_service.should_process_attestation(
) {
let _ = service
.send(RouterMessage::PubsubMessage(id, source, message))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
} else {
_ => {
// all else is sent to the router
let _ = service
.send(RouterMessage::PubsubMessage(id, source, message))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
BehaviourEvent::PeerSubscribed(_, _) => {},
Libp2pEvent::NewListenAddr(multiaddr) => {
if let Some(delay) = &service.next_fork_update {
if delay.is_elapsed() {
service.next_fork_update = next_fork_delay(&service.beacon_chain);
}, "network");
/// Returns a `Delay` that triggers shortly after the next change in the beacon chain fork version.
/// If there is no scheduled fork, `None` is returned.
fn next_fork_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Delay> {
beacon_chain.duration_to_next_fork().map(|until_fork| {
// Add a short time-out to start within the new fork period.
let delay = Duration::from_millis(200);
tokio::time::delay_until(tokio::time::Instant::now() + until_fork + delay)
/// Inspects the `messages` that were being sent to the network and updates Prometheus metrics.
fn expose_publish_metrics<T: EthSpec>(messages: &[PubsubMessage<T>]) {
for message in messages {
match message {
PubsubMessage::BeaconBlock(_) => metrics::inc_counter(&metrics::GOSSIP_BLOCKS_TX),
PubsubMessage::Attestation(_) => {
PubsubMessage::AggregateAndProofAttestation(_) => {
_ => {}
/// Inspects a `message` received from the network and updates Prometheus metrics.
fn expose_receive_metrics<T: EthSpec>(message: &PubsubMessage<T>) {
match message {
PubsubMessage::BeaconBlock(_) => metrics::inc_counter(&metrics::GOSSIP_BLOCKS_RX),
PubsubMessage::Attestation(_) => {
PubsubMessage::AggregateAndProofAttestation(_) => {
_ => {}