Code quality improvents to the network service (#2932)

Checking how to priorize the polling of the network I moved most of the service code to functions. This change I think it's worth on it's own for code quality since inside the `tokio::select` many tools don't work (cargo fmt, sometimes clippy, and sometimes even the compiler's errors get wack). This is functionally equivalent to the previous code, just better organized
This commit is contained in:
Divma 2022-01-26 23:14:23 +00:00
parent 9964f5afe5
commit f2b1e096b2

View File

@ -7,6 +7,7 @@ use crate::{
NetworkConfig, NetworkConfig,
}; };
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use futures::channel::mpsc::Sender;
use futures::future::OptionFuture; use futures::future::OptionFuture;
use futures::prelude::*; use futures::prelude::*;
use lighthouse_network::{ use lighthouse_network::{
@ -279,7 +280,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
log: network_log, log: network_log,
}; };
spawn_service(executor, network_service); network_service.spawn_service(executor);
Ok((network_globals, network_send)) Ok((network_globals, network_send))
} }
@ -320,428 +321,531 @@ impl<T: BeaconChainTypes> NetworkService<T> {
result result
} }
}
fn spawn_service<T: BeaconChainTypes>( fn send_to_router(&mut self, msg: RouterMessage<T::EthSpec>) {
executor: task_executor::TaskExecutor, if let Err(mpsc::error::SendError(msg)) = self.router_send.send(msg) {
mut service: NetworkService<T>, debug!(self.log, "Failed to send msg to router"; "msg" => ?msg);
) { }
let mut shutdown_sender = executor.shutdown_sender(); }
// spawn on the current executor fn spawn_service(mut self, executor: task_executor::TaskExecutor) {
executor.spawn(async move { let mut shutdown_sender = executor.shutdown_sender();
loop { // spawn on the current executor
// build the futures to check simultaneously let service_fut = async move {
tokio::select! { loop {
_ = service.metrics_update.tick(), if service.metrics_enabled => { tokio::select! {
// update various network metrics _ = self.metrics_update.tick(), if self.metrics_enabled => {
metrics::update_gossip_metrics::<T::EthSpec>( // update various network metrics
service.libp2p.swarm.behaviour().gs(), metrics::update_gossip_metrics::<T::EthSpec>(
&service.network_globals, self.libp2p.swarm.behaviour().gs(),
); &self.network_globals,
// update sync metrics );
metrics::update_sync_metrics(&service.network_globals); // update sync metrics
metrics::update_sync_metrics(&self.network_globals);
}
} _ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(),
_ = service.gossipsub_parameter_update.tick() => {
if let Ok(slot) = service.beacon_chain.slot() { // handle a message sent to the network
if let Some(active_validators) = service.beacon_chain.with_head(|head| { Some(msg) = self.network_recv.recv() => self.on_network_msg(msg, &mut shutdown_sender).await,
Ok::<_, BeaconChainError>(
head // process any attestation service events
.beacon_state Some(msg) = self.attestation_service.next() => self.on_attestation_service_msg(msg),
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map(|indices| indices.len()) // process any sync committee service events
.ok() Some(msg) = self.sync_committee_service.next() => self.on_sync_commitee_service_message(msg),
.or_else(|| {
// if active validator cached was not build we count the event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,
// active validators
service Some(_) = &mut self.next_fork_update => self.update_next_fork(),
.beacon_chain
.epoch() Some(_) = &mut self.next_unsubscribe => {
.ok() let new_enr_fork_id = self.beacon_chain.enr_fork_id();
.map(|current_epoch| { self.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
head info!(self.log, "Unsubscribed from old fork topics");
.beacon_state self.next_unsubscribe = Box::pin(None.into());
.validators() }
.iter()
.filter(|validator| Some(_) = &mut self.next_fork_subscriptions => {
validator.is_active_at(current_epoch) if let Some((fork_name, _)) = self.beacon_chain.duration_to_next_fork() {
) let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
.count() let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
}) info!(self.log, "Subscribing to new fork topics");
}) self.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
) self.next_fork_subscriptions = Box::pin(None.into());
}).unwrap_or(None) { }
if service.libp2p.swarm.behaviour_mut().update_gossipsub_parameters(active_validators, slot).is_err() { else {
error!( error!(self.log, "Fork subscription scheduled but no fork scheduled");
service.log,
"Failed to update gossipsub parameters";
"active_validators" => active_validators
);
}
} }
} }
} }
// handle a message sent to the network metrics::update_bandwidth_metrics(self.libp2p.bandwidth.clone());
Some(message) = service.network_recv.recv() => { }
};
executor.spawn(service_fut, "network");
}
/// Handle an event received from the network.
async fn on_libp2p_event(
&mut self,
ev: Libp2pEvent<T::EthSpec>,
shutdown_sender: &mut Sender<ShutdownReason>,
) {
match ev {
Libp2pEvent::Behaviour(event) => match event {
BehaviourEvent::PeerConnectedOutgoing(peer_id) => {
self.send_to_router(RouterMessage::PeerDialed(peer_id));
}
BehaviourEvent::PeerConnectedIncoming(_)
| BehaviourEvent::PeerBanned(_)
| BehaviourEvent::PeerUnbanned(_) => {
// No action required for these events.
}
BehaviourEvent::PeerDisconnected(peer_id) => {
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
}
BehaviourEvent::RequestReceived {
peer_id,
id,
request,
} => {
self.send_to_router(RouterMessage::RPCRequestReceived {
peer_id,
id,
request,
});
}
BehaviourEvent::ResponseReceived {
peer_id,
id,
response,
} => {
self.send_to_router(RouterMessage::RPCResponseReceived {
peer_id,
request_id: id,
response,
});
}
BehaviourEvent::RPCFailed { id, peer_id } => {
self.send_to_router(RouterMessage::RPCFailed {
peer_id,
request_id: id,
});
}
BehaviourEvent::StatusPeer(peer_id) => {
self.send_to_router(RouterMessage::StatusPeer(peer_id));
}
BehaviourEvent::PubsubMessage {
id,
source,
message,
..
} => {
match message { match message {
NetworkMessage::SendRequest{ peer_id, request, request_id } => { // attestation information gets processed in the attestation service
service.libp2p.send_request(peer_id, request_id, request); PubsubMessage::Attestation(ref subnet_and_attestation) => {
} let subnet = subnet_and_attestation.0;
NetworkMessage::SendResponse{ peer_id, response, id } => { let attestation = &subnet_and_attestation.1;
service.libp2p.send_response(peer_id, id, response); // checks if we have an aggregator for the slot. If so, we should process
} // the attestation, else we just just propagate the Attestation.
NetworkMessage::SendErrorResponse{ peer_id, error, id, reason } => { let should_process = self
service.libp2p.respond_with_error(peer_id, id, error, reason);
}
NetworkMessage::UPnPMappingEstablished { tcp_socket, udp_socket} => {
service.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
// If there is an external TCP port update, modify our local ENR.
if let Some(tcp_socket) = tcp_socket {
if let Err(e) = service.libp2p.swarm.behaviour_mut().discovery_mut().update_enr_tcp_port(tcp_socket.port()) {
warn!(service.log, "Failed to update ENR"; "error" => e);
}
}
// if the discovery service is not auto-updating, update it with the
// UPnP mappings
if !service.discovery_auto_update {
if let Some(udp_socket) = udp_socket {
if let Err(e) = service.libp2p.swarm.behaviour_mut().discovery_mut().update_enr_udp_socket(udp_socket) {
warn!(service.log, "Failed to update ENR"; "error" => e);
}
}
}
},
NetworkMessage::ValidationResult {
propagation_source,
message_id,
validation_result,
} => {
trace!(service.log, "Propagating gossipsub message";
"propagation_peer" => ?propagation_source,
"message_id" => %message_id,
"validation_result" => ?validation_result
);
service
.libp2p
.swarm
.behaviour_mut()
.report_message_validation_result(
&propagation_source, message_id, validation_result
);
}
NetworkMessage::Publish { messages } => {
let mut topic_kinds = Vec::new();
for message in &messages {
if !topic_kinds.contains(&message.kind()) {
topic_kinds.push(message.kind());
}
}
debug!(
service.log,
"Sending pubsub messages";
"count" => messages.len(),
"topics" => ?topic_kinds
);
service.libp2p.swarm.behaviour_mut().publish(messages);
}
NetworkMessage::ReportPeer { peer_id, action, source, msg } => service.libp2p.report_peer(&peer_id, action, source, msg),
NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source),
NetworkMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = service
.attestation_service .attestation_service
.validator_subscriptions(subscriptions) { .should_process_attestation(subnet, attestation);
warn!(service.log, "Attestation validator subscription failed"; "error" => e); self.send_to_router(RouterMessage::PubsubMessage(
}
}
NetworkMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = service
.sync_committee_service
.validator_subscriptions(subscriptions) {
warn!(service.log, "Sync committee calidator subscription failed"; "error" => e);
}
}
NetworkMessage::SubscribeCoreTopics => {
if service.shutdown_after_sync {
let _ = shutdown_sender
.send(ShutdownReason::Success(
"Beacon node completed sync. Shutting down as --shutdown-after-sync flag is enabled"))
.await
.map_err(|e| warn!(
service.log,
"failed to send a shutdown signal";
"error" => %e
));
return;
}
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(topic_kind.clone(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
// If we are to subscribe to all subnets we do it here
if service.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
// Update the ENR bitfield
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64() {
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
// Update the ENR bitfield
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
if !subscribed_topics.is_empty() {
info!(
service.log,
"Subscribed to topics";
"topics" => ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::<Vec<_>>()
);
}
}
}
}
// process any attestation service events
Some(attestation_service_message) = service.attestation_service.next() => {
match attestation_service_message {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
}
}
}
// process any sync committee service events
Some(sync_committee_service_message) = service.sync_committee_service.next() => {
match sync_committee_service_message {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in service.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
service.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
}
}
}
libp2p_event = service.libp2p.next_event() => {
// poll the swarm
match libp2p_event {
Libp2pEvent::Behaviour(event) => match event {
BehaviourEvent::PeerConnectedOutgoing(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDialed(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer dialed to router"); });
},
BehaviourEvent::PeerConnectedIncoming(_) | BehaviourEvent::PeerBanned(_) | BehaviourEvent::PeerUnbanned(_) => {
// No action required for these events.
},
BehaviourEvent::PeerDisconnected(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
BehaviourEvent::RequestReceived{peer_id, id, request} => {
let _ = service
.router_send
.send(RouterMessage::RPCRequestReceived{peer_id, id, request})
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
});
}
BehaviourEvent::ResponseReceived{peer_id, id, response} => {
let _ = service
.router_send
.send(RouterMessage::RPCResponseReceived{ peer_id, request_id: id, response })
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
});
}
BehaviourEvent::RPCFailed{id, peer_id} => {
let _ = service
.router_send
.send(RouterMessage::RPCFailed{ peer_id, request_id: id})
.map_err(|_| {
debug!(service.log, "Failed to send RPC to router");
});
}
BehaviourEvent::StatusPeer(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::StatusPeer(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send re-status peer to router");
});
}
BehaviourEvent::PubsubMessage {
id, id,
source, source,
message, message,
.. should_process,
} => { ));
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = service.attestation_service.should_process_attestation(
subnet,
attestation,
);
let _ = service
.router_send
.send(RouterMessage::PubsubMessage(id, source, message, should_process))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
});
}
_ => {
// all else is sent to the router
let _ = service
.router_send
.send(RouterMessage::PubsubMessage(id, source, message, true))
.map_err(|_| {
debug!(service.log, "Failed to send pubsub message to router");
});
}
}
}
} }
Libp2pEvent::NewListenAddr(multiaddr) => { _ => {
service.network_globals.listen_multiaddrs.write().push(multiaddr); // all else is sent to the router
} self.send_to_router(RouterMessage::PubsubMessage(
Libp2pEvent::ZeroListeners => { id, source, message, true,
let _ = shutdown_sender ));
.send(ShutdownReason::Failure("All listeners are closed. Unable to listen"))
.await
.map_err(|e| warn!(
service.log,
"failed to send a shutdown signal";
"error" => %e
));
} }
} }
} }
Some(_) = &mut service.next_fork_update => { },
let new_enr_fork_id = service.beacon_chain.enr_fork_id(); Libp2pEvent::NewListenAddr(multiaddr) => {
self.network_globals
.listen_multiaddrs
.write()
.push(multiaddr);
}
Libp2pEvent::ZeroListeners => {
let _ = shutdown_sender
.send(ShutdownReason::Failure(
"All listeners are closed. Unable to listen",
))
.await
.map_err(|e| {
warn!(
self.log,
"failed to send a shutdown signal";
"error" => %e
)
});
}
}
}
let fork_context = &service.fork_context; /// Handle a message sent to the network service.
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { async fn on_network_msg(
info!( &mut self,
service.log, msg: NetworkMessage<T::EthSpec>,
"Transitioned to new fork"; shutdown_sender: &mut Sender<ShutdownReason>,
"old_fork" => ?fork_context.current_fork(), ) {
"new_fork" => ?new_fork_name, match msg {
); NetworkMessage::SendRequest {
fork_context.update_current_fork(*new_fork_name); peer_id,
request,
service request_id,
} => {
self.libp2p.send_request(peer_id, request_id, request);
}
NetworkMessage::SendResponse {
peer_id,
response,
id,
} => {
self.libp2p.send_response(peer_id, id, response);
}
NetworkMessage::SendErrorResponse {
peer_id,
error,
id,
reason,
} => {
self.libp2p.respond_with_error(peer_id, id, error, reason);
}
NetworkMessage::UPnPMappingEstablished {
tcp_socket,
udp_socket,
} => {
self.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
// If there is an external TCP port update, modify our local ENR.
if let Some(tcp_socket) = tcp_socket {
if let Err(e) = self
.libp2p
.swarm
.behaviour_mut()
.discovery_mut()
.update_enr_tcp_port(tcp_socket.port())
{
warn!(self.log, "Failed to update ENR"; "error" => e);
}
}
// if the discovery service is not auto-updating, update it with the
// UPnP mappings
if !self.discovery_auto_update {
if let Some(udp_socket) = udp_socket {
if let Err(e) = self
.libp2p .libp2p
.swarm .swarm
.behaviour_mut() .behaviour_mut()
.update_fork_version(new_enr_fork_id.clone()); .discovery_mut()
// Reinitialize the next_fork_update .update_enr_udp_socket(udp_socket)
service.next_fork_update = Box::pin(next_fork_delay(&service.beacon_chain).into()); {
warn!(self.log, "Failed to update ENR"; "error" => e);
// Set the next_unsubscribe delay. }
let epoch_duration = service.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration);
// Update the `next_fork_subscriptions` timer if the next fork is known.
service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into());
service.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
info!(service.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
} else {
crit!(service.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
}
}
Some(_) = &mut service.next_unsubscribe => {
let new_enr_fork_id = service.beacon_chain.enr_fork_id();
service.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest);
info!(service.log, "Unsubscribed from old fork topics");
service.next_unsubscribe = Box::pin(None.into());
}
Some(_) = &mut service.next_fork_subscriptions => {
if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() {
let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root);
info!(service.log, "Subscribing to new fork topics");
service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
service.next_fork_subscriptions = Box::pin(None.into());
}
else {
error!(service.log, "Fork subscription scheduled but no fork scheduled");
} }
} }
} }
metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); NetworkMessage::ValidationResult {
propagation_source,
message_id,
validation_result,
} => {
trace!(self.log, "Propagating gossipsub message";
"propagation_peer" => ?propagation_source,
"message_id" => %message_id,
"validation_result" => ?validation_result
);
self.libp2p
.swarm
.behaviour_mut()
.report_message_validation_result(
&propagation_source,
message_id,
validation_result,
);
}
NetworkMessage::Publish { messages } => {
let mut topic_kinds = Vec::new();
for message in &messages {
if !topic_kinds.contains(&message.kind()) {
topic_kinds.push(message.kind());
}
}
debug!(
self.log,
"Sending pubsub messages";
"count" => messages.len(),
"topics" => ?topic_kinds
);
self.libp2p.swarm.behaviour_mut().publish(messages);
}
NetworkMessage::ReportPeer {
peer_id,
action,
source,
msg,
} => self.libp2p.report_peer(&peer_id, action, source, msg),
NetworkMessage::GoodbyePeer {
peer_id,
reason,
source,
} => self.libp2p.goodbye_peer(&peer_id, reason, source),
NetworkMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = self
.attestation_service
.validator_subscriptions(subscriptions)
{
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
}
}
NetworkMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = self
.sync_committee_service
.validator_subscriptions(subscriptions)
{
warn!(self.log, "Sync committee calidator subscription failed"; "error" => e);
}
}
NetworkMessage::SubscribeCoreTopics => {
if self.shutdown_after_sync {
if let Err(e) = shutdown_sender
.send(ShutdownReason::Success(
"Beacon node completed sync. \
Shutting down as --shutdown-after-sync flag is enabled",
))
.await
{
warn!(
self.log,
"failed to send a shutdown signal";
"error" => %e
)
}
return;
}
let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
topic_kind.clone(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
let subnet = Subnet::Attestation(SubnetId::new(subnet_id));
// Update the ENR bitfield
self.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
let subnet_max = <<T as BeaconChainTypes>::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64();
for subnet_id in 0..subnet_max {
let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id));
// Update the ENR bitfield
self.libp2p
.swarm
.behaviour_mut()
.update_enr_subnet(subnet, true);
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
subnet.into(),
GossipEncoding::default(),
fork_digest,
);
if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
if !subscribed_topics.is_empty() {
info!(
self.log,
"Subscribed to topics";
"topics" => ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::<Vec<_>>()
);
}
}
} }
}, "network"); }
fn update_gossipsub_parameters(&mut self) {
if let Ok(slot) = self.beacon_chain.slot() {
if let Some(active_validators) = self
.beacon_chain
.with_head(|head| {
Ok::<_, BeaconChainError>(
head.beacon_state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map(|indices| indices.len())
.ok()
.or_else(|| {
// if active validator cached was not build we count the
// active validators
self.beacon_chain.epoch().ok().map(|current_epoch| {
head.beacon_state
.validators()
.iter()
.filter(|validator| validator.is_active_at(current_epoch))
.count()
})
}),
)
})
.unwrap_or(None)
{
if self
.libp2p
.swarm
.behaviour_mut()
.update_gossipsub_parameters(active_validators, slot)
.is_err()
{
error!(
self.log,
"Failed to update gossipsub parameters";
"active_validators" => active_validators
);
}
}
}
}
fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p
.swarm
.behaviour_mut()
.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p
.swarm
.behaviour_mut()
.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p
.swarm
.behaviour_mut()
.discover_subnet_peers(subnets_to_discover);
}
}
}
fn on_sync_commitee_service_message(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.swarm.behaviour_mut().subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.swarm.behaviour_mut().unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p
.swarm
.behaviour_mut()
.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p
.swarm
.behaviour_mut()
.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p
.swarm
.behaviour_mut()
.discover_subnet_peers(subnets_to_discover);
}
}
}
fn update_next_fork(&mut self) {
let new_enr_fork_id = self.beacon_chain.enr_fork_id();
let fork_context = &self.fork_context;
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
info!(
self.log,
"Transitioned to new fork";
"old_fork" => ?fork_context.current_fork(),
"new_fork" => ?new_fork_name,
);
fork_context.update_current_fork(*new_fork_name);
self.libp2p
.swarm
.behaviour_mut()
.update_fork_version(new_enr_fork_id);
// Reinitialize the next_fork_update
self.next_fork_update = Box::pin(next_fork_delay(&self.beacon_chain).into());
// Set the next_unsubscribe delay.
let epoch_duration =
self.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch();
let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration);
// Update the `next_fork_subscriptions` timer if the next fork is known.
self.next_fork_subscriptions =
Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into());
self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into());
info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS);
} else {
crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id);
}
}
} }
/// Returns a `Sleep` that triggers after the next change in the beacon chain fork version. /// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.