diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 35cf3fa90..c6f68d5fa 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -7,6 +7,7 @@ use crate::{ NetworkConfig, }; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; use lighthouse_network::{ @@ -279,7 +280,7 @@ impl NetworkService { log: network_log, }; - spawn_service(executor, network_service); + network_service.spawn_service(executor); Ok((network_globals, network_send)) } @@ -320,428 +321,531 @@ impl NetworkService { result } -} -fn spawn_service( - executor: task_executor::TaskExecutor, - mut service: NetworkService, -) { - let mut shutdown_sender = executor.shutdown_sender(); + fn send_to_router(&mut self, msg: RouterMessage) { + if let Err(mpsc::error::SendError(msg)) = self.router_send.send(msg) { + debug!(self.log, "Failed to send msg to router"; "msg" => ?msg); + } + } - // spawn on the current executor - executor.spawn(async move { + fn spawn_service(mut self, executor: task_executor::TaskExecutor) { + let mut shutdown_sender = executor.shutdown_sender(); - loop { - // build the futures to check simultaneously - tokio::select! { - _ = service.metrics_update.tick(), if service.metrics_enabled => { - // update various network metrics - metrics::update_gossip_metrics::( - service.libp2p.swarm.behaviour().gs(), - &service.network_globals, - ); - // update sync metrics - metrics::update_sync_metrics(&service.network_globals); + // spawn on the current executor + let service_fut = async move { + loop { + tokio::select! { + _ = self.metrics_update.tick(), if self.metrics_enabled => { + // update various network metrics + metrics::update_gossip_metrics::( + self.libp2p.swarm.behaviour().gs(), + &self.network_globals, + ); + // update sync metrics + metrics::update_sync_metrics(&self.network_globals); + } - } - _ = service.gossipsub_parameter_update.tick() => { - if let Ok(slot) = service.beacon_chain.slot() { - if let Some(active_validators) = service.beacon_chain.with_head(|head| { - Ok::<_, BeaconChainError>( - head - .beacon_state - .get_cached_active_validator_indices(RelativeEpoch::Current) - .map(|indices| indices.len()) - .ok() - .or_else(|| { - // if active validator cached was not build we count the - // active validators - service - .beacon_chain - .epoch() - .ok() - .map(|current_epoch| { - head - .beacon_state - .validators() - .iter() - .filter(|validator| - validator.is_active_at(current_epoch) - ) - .count() - }) - }) - ) - }).unwrap_or(None) { - if service.libp2p.swarm.behaviour_mut().update_gossipsub_parameters(active_validators, slot).is_err() { - error!( - service.log, - "Failed to update gossipsub parameters"; - "active_validators" => active_validators - ); - } + _ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(), + + // handle a message sent to the network + Some(msg) = self.network_recv.recv() => self.on_network_msg(msg, &mut shutdown_sender).await, + + // process any attestation service events + Some(msg) = self.attestation_service.next() => self.on_attestation_service_msg(msg), + + // process any sync committee service events + Some(msg) = self.sync_committee_service.next() => self.on_sync_commitee_service_message(msg), + + event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await, + + Some(_) = &mut self.next_fork_update => self.update_next_fork(), + + Some(_) = &mut self.next_unsubscribe => { + let new_enr_fork_id = self.beacon_chain.enr_fork_id(); + self.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest); + info!(self.log, "Unsubscribed from old fork topics"); + self.next_unsubscribe = Box::pin(None.into()); + } + + Some(_) = &mut self.next_fork_subscriptions => { + if let Some((fork_name, _)) = self.beacon_chain.duration_to_next_fork() { + let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name); + let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root); + info!(self.log, "Subscribing to new fork topics"); + self.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest); + self.next_fork_subscriptions = Box::pin(None.into()); + } + else { + error!(self.log, "Fork subscription scheduled but no fork scheduled"); } } } - // handle a message sent to the network - Some(message) = service.network_recv.recv() => { + metrics::update_bandwidth_metrics(self.libp2p.bandwidth.clone()); + } + }; + executor.spawn(service_fut, "network"); + } + + /// Handle an event received from the network. + async fn on_libp2p_event( + &mut self, + ev: Libp2pEvent, + shutdown_sender: &mut Sender, + ) { + match ev { + Libp2pEvent::Behaviour(event) => match event { + BehaviourEvent::PeerConnectedOutgoing(peer_id) => { + self.send_to_router(RouterMessage::PeerDialed(peer_id)); + } + BehaviourEvent::PeerConnectedIncoming(_) + | BehaviourEvent::PeerBanned(_) + | BehaviourEvent::PeerUnbanned(_) => { + // No action required for these events. + } + BehaviourEvent::PeerDisconnected(peer_id) => { + self.send_to_router(RouterMessage::PeerDisconnected(peer_id)); + } + BehaviourEvent::RequestReceived { + peer_id, + id, + request, + } => { + self.send_to_router(RouterMessage::RPCRequestReceived { + peer_id, + id, + request, + }); + } + BehaviourEvent::ResponseReceived { + peer_id, + id, + response, + } => { + self.send_to_router(RouterMessage::RPCResponseReceived { + peer_id, + request_id: id, + response, + }); + } + BehaviourEvent::RPCFailed { id, peer_id } => { + self.send_to_router(RouterMessage::RPCFailed { + peer_id, + request_id: id, + }); + } + BehaviourEvent::StatusPeer(peer_id) => { + self.send_to_router(RouterMessage::StatusPeer(peer_id)); + } + BehaviourEvent::PubsubMessage { + id, + source, + message, + .. + } => { match message { - NetworkMessage::SendRequest{ peer_id, request, request_id } => { - service.libp2p.send_request(peer_id, request_id, request); - } - NetworkMessage::SendResponse{ peer_id, response, id } => { - service.libp2p.send_response(peer_id, id, response); - } - NetworkMessage::SendErrorResponse{ peer_id, error, id, reason } => { - service.libp2p.respond_with_error(peer_id, id, error, reason); - } - NetworkMessage::UPnPMappingEstablished { tcp_socket, udp_socket} => { - service.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port())); - // If there is an external TCP port update, modify our local ENR. - if let Some(tcp_socket) = tcp_socket { - if let Err(e) = service.libp2p.swarm.behaviour_mut().discovery_mut().update_enr_tcp_port(tcp_socket.port()) { - warn!(service.log, "Failed to update ENR"; "error" => e); - } - } - // if the discovery service is not auto-updating, update it with the - // UPnP mappings - if !service.discovery_auto_update { - if let Some(udp_socket) = udp_socket { - if let Err(e) = service.libp2p.swarm.behaviour_mut().discovery_mut().update_enr_udp_socket(udp_socket) { - warn!(service.log, "Failed to update ENR"; "error" => e); - } - } - } - }, - NetworkMessage::ValidationResult { - propagation_source, - message_id, - validation_result, - } => { - trace!(service.log, "Propagating gossipsub message"; - "propagation_peer" => ?propagation_source, - "message_id" => %message_id, - "validation_result" => ?validation_result - ); - service - .libp2p - .swarm - .behaviour_mut() - .report_message_validation_result( - &propagation_source, message_id, validation_result - ); - } - NetworkMessage::Publish { messages } => { - let mut topic_kinds = Vec::new(); - for message in &messages { - if !topic_kinds.contains(&message.kind()) { - topic_kinds.push(message.kind()); - } - } - debug!( - service.log, - "Sending pubsub messages"; - "count" => messages.len(), - "topics" => ?topic_kinds - ); - service.libp2p.swarm.behaviour_mut().publish(messages); - } - NetworkMessage::ReportPeer { peer_id, action, source, msg } => service.libp2p.report_peer(&peer_id, action, source, msg), - NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source), - NetworkMessage::AttestationSubscribe { subscriptions } => { - if let Err(e) = service + // attestation information gets processed in the attestation service + PubsubMessage::Attestation(ref subnet_and_attestation) => { + let subnet = subnet_and_attestation.0; + let attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we should process + // the attestation, else we just just propagate the Attestation. + let should_process = self .attestation_service - .validator_subscriptions(subscriptions) { - warn!(service.log, "Attestation validator subscription failed"; "error" => e); - } - } - NetworkMessage::SyncCommitteeSubscribe { subscriptions } => { - if let Err(e) = service - .sync_committee_service - .validator_subscriptions(subscriptions) { - warn!(service.log, "Sync committee calidator subscription failed"; "error" => e); - } - } - NetworkMessage::SubscribeCoreTopics => { - if service.shutdown_after_sync { - let _ = shutdown_sender - .send(ShutdownReason::Success( - "Beacon node completed sync. Shutting down as --shutdown-after-sync flag is enabled")) - .await - .map_err(|e| warn!( - service.log, - "failed to send a shutdown signal"; - "error" => %e - )); - return; - } - let mut subscribed_topics: Vec = vec![]; - for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() { - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(topic_kind.clone(), GossipEncoding::default(), fork_digest); - if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(service.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - - // If we are to subscribe to all subnets we do it here - if service.subscribe_all_subnets { - for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { - let subnet = Subnet::Attestation(SubnetId::new(subnet_id)); - // Update the ENR bitfield - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true); - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(service.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - for subnet_id in 0..<::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64() { - let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id)); - // Update the ENR bitfield - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true); - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - if service.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(service.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - } - - if !subscribed_topics.is_empty() { - info!( - service.log, - "Subscribed to topics"; - "topics" => ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::>() - ); - } - } - } - } - // process any attestation service events - Some(attestation_service_message) = service.attestation_service.next() => { - match attestation_service_message { - SubnetServiceMessage::Subscribe(subnet) => { - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - service.libp2p.swarm.behaviour_mut().subscribe(topic); - } - } - SubnetServiceMessage::Unsubscribe(subnet) => { - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - service.libp2p.swarm.behaviour_mut().unsubscribe(topic); - } - } - SubnetServiceMessage::EnrAdd(subnet) => { - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true); - } - SubnetServiceMessage::EnrRemove(subnet) => { - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false); - } - SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { - service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover); - } - } - } - // process any sync committee service events - Some(sync_committee_service_message) = service.sync_committee_service.next() => { - match sync_committee_service_message { - SubnetServiceMessage::Subscribe(subnet) => { - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - service.libp2p.swarm.behaviour_mut().subscribe(topic); - } - } - SubnetServiceMessage::Unsubscribe(subnet) => { - for fork_digest in service.required_gossip_fork_digests() { - let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - service.libp2p.swarm.behaviour_mut().unsubscribe(topic); - } - } - SubnetServiceMessage::EnrAdd(subnet) => { - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true); - } - SubnetServiceMessage::EnrRemove(subnet) => { - service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, false); - } - SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { - service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover); - } - } - } - libp2p_event = service.libp2p.next_event() => { - // poll the swarm - match libp2p_event { - Libp2pEvent::Behaviour(event) => match event { - BehaviourEvent::PeerConnectedOutgoing(peer_id) => { - let _ = service - .router_send - .send(RouterMessage::PeerDialed(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer dialed to router"); }); - }, - BehaviourEvent::PeerConnectedIncoming(_) | BehaviourEvent::PeerBanned(_) | BehaviourEvent::PeerUnbanned(_) => { - // No action required for these events. - }, - BehaviourEvent::PeerDisconnected(peer_id) => { - let _ = service - .router_send - .send(RouterMessage::PeerDisconnected(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer disconnect to router"); - }); - }, - BehaviourEvent::RequestReceived{peer_id, id, request} => { - let _ = service - .router_send - .send(RouterMessage::RPCRequestReceived{peer_id, id, request}) - .map_err(|_| { - debug!(service.log, "Failed to send RPC to router"); - }); - } - BehaviourEvent::ResponseReceived{peer_id, id, response} => { - let _ = service - .router_send - .send(RouterMessage::RPCResponseReceived{ peer_id, request_id: id, response }) - .map_err(|_| { - debug!(service.log, "Failed to send RPC to router"); - }); - - } - BehaviourEvent::RPCFailed{id, peer_id} => { - let _ = service - .router_send - .send(RouterMessage::RPCFailed{ peer_id, request_id: id}) - .map_err(|_| { - debug!(service.log, "Failed to send RPC to router"); - }); - - } - BehaviourEvent::StatusPeer(peer_id) => { - let _ = service - .router_send - .send(RouterMessage::StatusPeer(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send re-status peer to router"); - }); - } - BehaviourEvent::PubsubMessage { + .should_process_attestation(subnet, attestation); + self.send_to_router(RouterMessage::PubsubMessage( id, source, message, - .. - } => { - match message { - // attestation information gets processed in the attestation service - PubsubMessage::Attestation(ref subnet_and_attestation) => { - let subnet = subnet_and_attestation.0; - let attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we should process - // the attestation, else we just just propagate the Attestation. - let should_process = service.attestation_service.should_process_attestation( - subnet, - attestation, - ); - let _ = service - .router_send - .send(RouterMessage::PubsubMessage(id, source, message, should_process)) - .map_err(|_| { - debug!(service.log, "Failed to send pubsub message to router"); - }); - } - _ => { - // all else is sent to the router - let _ = service - .router_send - .send(RouterMessage::PubsubMessage(id, source, message, true)) - .map_err(|_| { - debug!(service.log, "Failed to send pubsub message to router"); - }); - } - } - } + should_process, + )); } - Libp2pEvent::NewListenAddr(multiaddr) => { - service.network_globals.listen_multiaddrs.write().push(multiaddr); - } - Libp2pEvent::ZeroListeners => { - let _ = shutdown_sender - .send(ShutdownReason::Failure("All listeners are closed. Unable to listen")) - .await - .map_err(|e| warn!( - service.log, - "failed to send a shutdown signal"; - "error" => %e - )); + _ => { + // all else is sent to the router + self.send_to_router(RouterMessage::PubsubMessage( + id, source, message, true, + )); } } } - Some(_) = &mut service.next_fork_update => { - let new_enr_fork_id = service.beacon_chain.enr_fork_id(); + }, + Libp2pEvent::NewListenAddr(multiaddr) => { + self.network_globals + .listen_multiaddrs + .write() + .push(multiaddr); + } + Libp2pEvent::ZeroListeners => { + let _ = shutdown_sender + .send(ShutdownReason::Failure( + "All listeners are closed. Unable to listen", + )) + .await + .map_err(|e| { + warn!( + self.log, + "failed to send a shutdown signal"; + "error" => %e + ) + }); + } + } + } - let fork_context = &service.fork_context; - if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { - info!( - service.log, - "Transitioned to new fork"; - "old_fork" => ?fork_context.current_fork(), - "new_fork" => ?new_fork_name, - ); - fork_context.update_current_fork(*new_fork_name); - - service + /// Handle a message sent to the network service. + async fn on_network_msg( + &mut self, + msg: NetworkMessage, + shutdown_sender: &mut Sender, + ) { + match msg { + NetworkMessage::SendRequest { + peer_id, + request, + request_id, + } => { + self.libp2p.send_request(peer_id, request_id, request); + } + NetworkMessage::SendResponse { + peer_id, + response, + id, + } => { + self.libp2p.send_response(peer_id, id, response); + } + NetworkMessage::SendErrorResponse { + peer_id, + error, + id, + reason, + } => { + self.libp2p.respond_with_error(peer_id, id, error, reason); + } + NetworkMessage::UPnPMappingEstablished { + tcp_socket, + udp_socket, + } => { + self.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port())); + // If there is an external TCP port update, modify our local ENR. + if let Some(tcp_socket) = tcp_socket { + if let Err(e) = self + .libp2p + .swarm + .behaviour_mut() + .discovery_mut() + .update_enr_tcp_port(tcp_socket.port()) + { + warn!(self.log, "Failed to update ENR"; "error" => e); + } + } + // if the discovery service is not auto-updating, update it with the + // UPnP mappings + if !self.discovery_auto_update { + if let Some(udp_socket) = udp_socket { + if let Err(e) = self .libp2p .swarm .behaviour_mut() - .update_fork_version(new_enr_fork_id.clone()); - // Reinitialize the next_fork_update - service.next_fork_update = Box::pin(next_fork_delay(&service.beacon_chain).into()); - - // Set the next_unsubscribe delay. - let epoch_duration = service.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); - let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration); - - // Update the `next_fork_subscriptions` timer if the next fork is known. - service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into()); - service.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into()); - info!(service.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS); - } else { - crit!(service.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id); - } - - } - Some(_) = &mut service.next_unsubscribe => { - let new_enr_fork_id = service.beacon_chain.enr_fork_id(); - service.libp2p.swarm.behaviour_mut().unsubscribe_from_fork_topics_except(new_enr_fork_id.fork_digest); - info!(service.log, "Unsubscribed from old fork topics"); - service.next_unsubscribe = Box::pin(None.into()); - } - Some(_) = &mut service.next_fork_subscriptions => { - if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() { - let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name); - let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root); - info!(service.log, "Subscribing to new fork topics"); - service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest); - service.next_fork_subscriptions = Box::pin(None.into()); - } - else { - error!(service.log, "Fork subscription scheduled but no fork scheduled"); + .discovery_mut() + .update_enr_udp_socket(udp_socket) + { + warn!(self.log, "Failed to update ENR"; "error" => e); + } } } } - metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone()); + NetworkMessage::ValidationResult { + propagation_source, + message_id, + validation_result, + } => { + trace!(self.log, "Propagating gossipsub message"; + "propagation_peer" => ?propagation_source, + "message_id" => %message_id, + "validation_result" => ?validation_result + ); + self.libp2p + .swarm + .behaviour_mut() + .report_message_validation_result( + &propagation_source, + message_id, + validation_result, + ); + } + NetworkMessage::Publish { messages } => { + let mut topic_kinds = Vec::new(); + for message in &messages { + if !topic_kinds.contains(&message.kind()) { + topic_kinds.push(message.kind()); + } + } + debug!( + self.log, + "Sending pubsub messages"; + "count" => messages.len(), + "topics" => ?topic_kinds + ); + self.libp2p.swarm.behaviour_mut().publish(messages); + } + NetworkMessage::ReportPeer { + peer_id, + action, + source, + msg, + } => self.libp2p.report_peer(&peer_id, action, source, msg), + NetworkMessage::GoodbyePeer { + peer_id, + reason, + source, + } => self.libp2p.goodbye_peer(&peer_id, reason, source), + NetworkMessage::AttestationSubscribe { subscriptions } => { + if let Err(e) = self + .attestation_service + .validator_subscriptions(subscriptions) + { + warn!(self.log, "Attestation validator subscription failed"; "error" => e); + } + } + NetworkMessage::SyncCommitteeSubscribe { subscriptions } => { + if let Err(e) = self + .sync_committee_service + .validator_subscriptions(subscriptions) + { + warn!(self.log, "Sync committee calidator subscription failed"; "error" => e); + } + } + NetworkMessage::SubscribeCoreTopics => { + if self.shutdown_after_sync { + if let Err(e) = shutdown_sender + .send(ShutdownReason::Success( + "Beacon node completed sync. \ + Shutting down as --shutdown-after-sync flag is enabled", + )) + .await + { + warn!( + self.log, + "failed to send a shutdown signal"; + "error" => %e + ) + } + return; + } + let mut subscribed_topics: Vec = vec![]; + for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() { + for fork_digest in self.required_gossip_fork_digests() { + let topic = GossipTopic::new( + topic_kind.clone(), + GossipEncoding::default(), + fork_digest, + ); + if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + + // If we are to subscribe to all subnets we do it here + if self.subscribe_all_subnets { + for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { + let subnet = Subnet::Attestation(SubnetId::new(subnet_id)); + // Update the ENR bitfield + self.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet, true); + for fork_digest in self.required_gossip_fork_digests() { + let topic = GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); + if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + let subnet_max = <::EthSpec as EthSpec>::SyncCommitteeSubnetCount::to_u64(); + for subnet_id in 0..subnet_max { + let subnet = Subnet::SyncCommittee(SyncSubnetId::new(subnet_id)); + // Update the ENR bitfield + self.libp2p + .swarm + .behaviour_mut() + .update_enr_subnet(subnet, true); + for fork_digest in self.required_gossip_fork_digests() { + let topic = GossipTopic::new( + subnet.into(), + GossipEncoding::default(), + fork_digest, + ); + if self.libp2p.swarm.behaviour_mut().subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + } + + if !subscribed_topics.is_empty() { + info!( + self.log, + "Subscribed to topics"; + "topics" => ?subscribed_topics.into_iter().map(|topic| format!("{}", topic)).collect::>() + ); + } + } } - }, "network"); + } + + fn update_gossipsub_parameters(&mut self) { + if let Ok(slot) = self.beacon_chain.slot() { + if let Some(active_validators) = self + .beacon_chain + .with_head(|head| { + Ok::<_, BeaconChainError>( + head.beacon_state + .get_cached_active_validator_indices(RelativeEpoch::Current) + .map(|indices| indices.len()) + .ok() + .or_else(|| { + // if active validator cached was not build we count the + // active validators + self.beacon_chain.epoch().ok().map(|current_epoch| { + head.beacon_state + .validators() + .iter() + .filter(|validator| validator.is_active_at(current_epoch)) + .count() + }) + }), + ) + }) + .unwrap_or(None) + { + if self + .libp2p + .swarm + .behaviour_mut() + .update_gossipsub_parameters(active_validators, slot) + .is_err() + { + error!( + self.log, + "Failed to update gossipsub parameters"; + "active_validators" => active_validators + ); + } + } + } + } + + fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) { + match msg { + SubnetServiceMessage::Subscribe(subnet) => { + for fork_digest in self.required_gossip_fork_digests() { + let topic = + GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); + self.libp2p.swarm.behaviour_mut().subscribe(topic); + } + } + SubnetServiceMessage::Unsubscribe(subnet) => { + for fork_digest in self.required_gossip_fork_digests() { + let topic = + GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); + self.libp2p.swarm.behaviour_mut().unsubscribe(topic); + } + } + SubnetServiceMessage::EnrAdd(subnet) => { + self.libp2p + .swarm + .behaviour_mut() + .update_enr_subnet(subnet, true); + } + SubnetServiceMessage::EnrRemove(subnet) => { + self.libp2p + .swarm + .behaviour_mut() + .update_enr_subnet(subnet, false); + } + SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { + self.libp2p + .swarm + .behaviour_mut() + .discover_subnet_peers(subnets_to_discover); + } + } + } + + fn on_sync_commitee_service_message(&mut self, msg: SubnetServiceMessage) { + match msg { + SubnetServiceMessage::Subscribe(subnet) => { + for fork_digest in self.required_gossip_fork_digests() { + let topic = + GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); + self.libp2p.swarm.behaviour_mut().subscribe(topic); + } + } + SubnetServiceMessage::Unsubscribe(subnet) => { + for fork_digest in self.required_gossip_fork_digests() { + let topic = + GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); + self.libp2p.swarm.behaviour_mut().unsubscribe(topic); + } + } + SubnetServiceMessage::EnrAdd(subnet) => { + self.libp2p + .swarm + .behaviour_mut() + .update_enr_subnet(subnet, true); + } + SubnetServiceMessage::EnrRemove(subnet) => { + self.libp2p + .swarm + .behaviour_mut() + .update_enr_subnet(subnet, false); + } + SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { + self.libp2p + .swarm + .behaviour_mut() + .discover_subnet_peers(subnets_to_discover); + } + } + } + + fn update_next_fork(&mut self) { + let new_enr_fork_id = self.beacon_chain.enr_fork_id(); + + let fork_context = &self.fork_context; + if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) { + info!( + self.log, + "Transitioned to new fork"; + "old_fork" => ?fork_context.current_fork(), + "new_fork" => ?new_fork_name, + ); + fork_context.update_current_fork(*new_fork_name); + + self.libp2p + .swarm + .behaviour_mut() + .update_fork_version(new_enr_fork_id); + // Reinitialize the next_fork_update + self.next_fork_update = Box::pin(next_fork_delay(&self.beacon_chain).into()); + + // Set the next_unsubscribe delay. + let epoch_duration = + self.beacon_chain.spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); + let unsubscribe_delay = Duration::from_secs(UNSUBSCRIBE_DELAY_EPOCHS * epoch_duration); + + // Update the `next_fork_subscriptions` timer if the next fork is known. + self.next_fork_subscriptions = + Box::pin(next_fork_subscriptions_delay(&self.beacon_chain).into()); + self.next_unsubscribe = Box::pin(Some(tokio::time::sleep(unsubscribe_delay)).into()); + info!(self.log, "Network will unsubscribe from old fork gossip topics in a few epochs"; "remaining_epochs" => UNSUBSCRIBE_DELAY_EPOCHS); + } else { + crit!(self.log, "Unknown new enr fork id"; "new_fork_id" => ?new_enr_fork_id); + } + } } /// Returns a `Sleep` that triggers after the next change in the beacon chain fork version.