Network upgrades (#2345)
This commit is contained in:
parent
b0f5c4c776
commit
4aa06c9555
634
Cargo.lock
generated
634
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -5,7 +5,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
discv5 = { version = "0.1.0-beta.5", features = ["libp2p"] }
|
discv5 = { version = "0.1.0-beta.4", features = ["libp2p"] }
|
||||||
unsigned-varint = { version = "0.6.0", features = ["codec"] }
|
unsigned-varint = { version = "0.6.0", features = ["codec"] }
|
||||||
types = { path = "../../consensus/types" }
|
types = { path = "../../consensus/types" }
|
||||||
hashset_delay = { path = "../../common/hashset_delay" }
|
hashset_delay = { path = "../../common/hashset_delay" }
|
||||||
@ -42,9 +42,9 @@ regex = "1.3.9"
|
|||||||
strum = { version = "0.20", features = ["derive"] }
|
strum = { version = "0.20", features = ["derive"] }
|
||||||
|
|
||||||
[dependencies.libp2p]
|
[dependencies.libp2p]
|
||||||
version = "0.35.1"
|
version = "0.38.0"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]
|
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio"]
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1.1.0", features = ["full"] }
|
tokio = { version = "1.1.0", features = ["full"] }
|
||||||
|
@ -56,7 +56,7 @@ impl<TSpec: EthSpec> DelegatingHandler<TSpec> {
|
|||||||
|
|
||||||
/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
|
/// Wrapper around the `ProtocolsHandler::InEvent` types of the handlers.
|
||||||
/// Simply delegated to the corresponding behaviour's handler.
|
/// Simply delegated to the corresponding behaviour's handler.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub enum DelegateIn<TSpec: EthSpec> {
|
pub enum DelegateIn<TSpec: EthSpec> {
|
||||||
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
|
Gossipsub(<GossipHandler as ProtocolsHandler>::InEvent),
|
||||||
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
|
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::InEvent),
|
||||||
@ -141,8 +141,8 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
|
|||||||
.max(identify_proto.timeout());
|
.max(identify_proto.timeout());
|
||||||
|
|
||||||
let select = SelectUpgrade::new(
|
let select = SelectUpgrade::new(
|
||||||
gossip_proto.into_upgrade().1,
|
gossip_proto.into_upgrade().0,
|
||||||
SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1),
|
SelectUpgrade::new(rpc_proto.into_upgrade().0, identify_proto.into_upgrade().0),
|
||||||
);
|
);
|
||||||
|
|
||||||
SubstreamProtocol::new(select, ()).with_timeout(timeout)
|
SubstreamProtocol::new(select, ()).with_timeout(timeout)
|
||||||
@ -202,7 +202,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
|
|||||||
match event {
|
match event {
|
||||||
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
|
DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev),
|
||||||
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
|
DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev),
|
||||||
DelegateIn::Identify(()) => self.identify_handler.inject_event(()),
|
DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
|
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
|
||||||
Delegate(DelegateIn<TSpec>),
|
Delegate(DelegateIn<TSpec>),
|
||||||
/// Start the shutdown process.
|
/// Start the shutdown process.
|
||||||
|
@ -24,7 +24,7 @@ use libp2p::{
|
|||||||
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
|
Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance,
|
||||||
MessageAuthenticity, MessageId, PeerScoreThresholds,
|
MessageAuthenticity, MessageId, PeerScoreThresholds,
|
||||||
},
|
},
|
||||||
identify::{Identify, IdentifyEvent},
|
identify::{Identify, IdentifyConfig, IdentifyEvent},
|
||||||
swarm::{
|
swarm::{
|
||||||
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
|
AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler,
|
||||||
PollParameters, ProtocolsHandler,
|
PollParameters, ProtocolsHandler,
|
||||||
@ -151,18 +151,14 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
) -> error::Result<Self> {
|
) -> error::Result<Self> {
|
||||||
let behaviour_log = log.new(o!());
|
let behaviour_log = log.new(o!());
|
||||||
|
|
||||||
let identify = if net_conf.private {
|
let identify_config = if net_conf.private {
|
||||||
Identify::new(
|
IdentifyConfig::new(
|
||||||
"".into(),
|
|
||||||
"".into(),
|
"".into(),
|
||||||
local_key.public(), // Still send legitimate public key
|
local_key.public(), // Still send legitimate public key
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
Identify::new(
|
IdentifyConfig::new("eth2/1.0.0".into(), local_key.public())
|
||||||
"lighthouse/libp2p".into(),
|
.with_agent_version(lighthouse_version::version_with_platform())
|
||||||
lighthouse_version::version_with_platform(),
|
|
||||||
local_key.public(),
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let enr_fork_id = network_globals
|
let enr_fork_id = network_globals
|
||||||
@ -221,7 +217,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
Ok(Behaviour {
|
Ok(Behaviour {
|
||||||
eth2_rpc: RPC::new(log.clone()),
|
eth2_rpc: RPC::new(log.clone()),
|
||||||
gossipsub,
|
gossipsub,
|
||||||
identify,
|
identify: Identify::new(identify_config),
|
||||||
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
|
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
|
||||||
.await?,
|
.await?,
|
||||||
events: VecDeque::new(),
|
events: VecDeque::new(),
|
||||||
@ -902,11 +898,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
|
|
||||||
fn on_identify_event(&mut self, event: IdentifyEvent) {
|
fn on_identify_event(&mut self, event: IdentifyEvent) {
|
||||||
match event {
|
match event {
|
||||||
IdentifyEvent::Received {
|
IdentifyEvent::Received { peer_id, mut info } => {
|
||||||
peer_id,
|
|
||||||
mut info,
|
|
||||||
observed_addr,
|
|
||||||
} => {
|
|
||||||
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
|
if info.listen_addrs.len() > MAX_IDENTIFY_ADDRESSES {
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
@ -921,12 +913,13 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
|||||||
"protocol_version" => info.protocol_version,
|
"protocol_version" => info.protocol_version,
|
||||||
"agent_version" => info.agent_version,
|
"agent_version" => info.agent_version,
|
||||||
"listening_ addresses" => ?info.listen_addrs,
|
"listening_ addresses" => ?info.listen_addrs,
|
||||||
"observed_address" => ?observed_addr,
|
"observed_address" => ?info.observed_addr,
|
||||||
"protocols" => ?info.protocols
|
"protocols" => ?info.protocols
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
IdentifyEvent::Sent { .. } => {}
|
IdentifyEvent::Sent { .. } => {}
|
||||||
IdentifyEvent::Error { .. } => {}
|
IdentifyEvent::Error { .. } => {}
|
||||||
|
IdentifyEvent::Pushed { .. } => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1171,12 +1164,16 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
|
|||||||
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
|
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
|
fn inject_new_listener(&mut self, id: ListenerId) {
|
||||||
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
|
delegate_to_behaviours!(self, inject_new_listener, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
|
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
|
||||||
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
|
delegate_to_behaviours!(self, inject_new_listen_addr, id, addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
|
||||||
|
delegate_to_behaviours!(self, inject_expired_listen_addr, id, addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
|
||||||
|
@ -163,6 +163,7 @@ impl Default for Config {
|
|||||||
.query_parallelism(5)
|
.query_parallelism(5)
|
||||||
.disable_report_discovered_peers()
|
.disable_report_discovered_peers()
|
||||||
.ip_limit() // limits /24 IP's in buckets.
|
.ip_limit() // limits /24 IP's in buckets.
|
||||||
|
.incoming_bucket_limit(8) // half the bucket size
|
||||||
.ping_interval(Duration::from_secs(300))
|
.ping_interval(Duration::from_secs(300))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -221,7 +221,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
||||||
|
|
||||||
for topic_kind in &config.topics {
|
for topic_kind in &config.topics {
|
||||||
if swarm.subscribe_kind(topic_kind.clone()) {
|
if swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||||
subscribed_topics.push(topic_kind.clone());
|
subscribed_topics.push(topic_kind.clone());
|
||||||
} else {
|
} else {
|
||||||
warn!(log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
warn!(log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||||
@ -244,7 +244,9 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
|
|
||||||
/// Sends a request to a peer, with a given Id.
|
/// Sends a request to a peer, with a given Id.
|
||||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
|
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
|
||||||
self.swarm.send_request(peer_id, request_id, request);
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.send_request(peer_id, request_id, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Informs the peer that their request failed.
|
/// Informs the peer that their request failed.
|
||||||
@ -255,22 +257,30 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
error: RPCResponseErrorCode,
|
error: RPCResponseErrorCode,
|
||||||
reason: String,
|
reason: String,
|
||||||
) {
|
) {
|
||||||
self.swarm._send_error_reponse(peer_id, id, error, reason);
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
._send_error_reponse(peer_id, id, error, reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report a peer's action.
|
/// Report a peer's action.
|
||||||
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
|
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
|
||||||
self.swarm.report_peer(peer_id, action, source);
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.report_peer(peer_id, action, source);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Disconnect and ban a peer, providing a reason.
|
/// Disconnect and ban a peer, providing a reason.
|
||||||
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
|
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
|
||||||
self.swarm.goodbye_peer(peer_id, reason, source);
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.goodbye_peer(peer_id, reason, source);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a response to a peer's request.
|
/// Sends a response to a peer's request.
|
||||||
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
|
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
|
||||||
self.swarm.send_successful_response(peer_id, id, response);
|
self.swarm
|
||||||
|
.behaviour_mut()
|
||||||
|
.send_successful_response(peer_id, id, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
|
pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
|
||||||
@ -350,8 +360,8 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
|
|||||||
fn build_transport(
|
fn build_transport(
|
||||||
local_private_key: Keypair,
|
local_private_key: Keypair,
|
||||||
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
|
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
|
||||||
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
|
let tcp = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
|
||||||
let transport = libp2p::dns::DnsConfig::new(transport)?;
|
let transport = libp2p::dns::TokioDnsConfig::system(tcp)?;
|
||||||
#[cfg(feature = "libp2p-websocket")]
|
#[cfg(feature = "libp2p-websocket")]
|
||||||
let transport = {
|
let transport = {
|
||||||
let trans_clone = transport.clone();
|
let trans_clone = transport.clone();
|
||||||
|
@ -126,7 +126,7 @@ pub async fn build_libp2p_instance(
|
|||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
|
pub fn get_enr(node: &LibP2PService<E>) -> Enr {
|
||||||
node.swarm.local_enr()
|
node.swarm.behaviour().local_enr()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns `n` libp2p peers in fully connected topology.
|
// Returns `n` libp2p peers in fully connected topology.
|
||||||
@ -171,7 +171,7 @@ pub async fn build_node_pair(
|
|||||||
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
|
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
|
||||||
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;
|
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;
|
||||||
|
|
||||||
let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone();
|
let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone();
|
||||||
|
|
||||||
// let the two nodes set up listeners
|
// let the two nodes set up listeners
|
||||||
let sender_fut = async {
|
let sender_fut = async {
|
||||||
|
@ -56,7 +56,7 @@ fn test_status_rpc() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -90,7 +90,7 @@ fn test_status_rpc() {
|
|||||||
if request == rpc_request {
|
if request == rpc_request {
|
||||||
// send the response
|
// send the response
|
||||||
debug!(log, "Receiver Received");
|
debug!(log, "Receiver Received");
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
@ -152,7 +152,7 @@ fn test_blocks_by_range_chunked_rpc() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -197,14 +197,14 @@ fn test_blocks_by_range_chunked_rpc() {
|
|||||||
// send the response
|
// send the response
|
||||||
warn!(log, "Receiver got request");
|
warn!(log, "Receiver got request");
|
||||||
for _ in 1..=messages_to_send {
|
for _ in 1..=messages_to_send {
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// send the stream termination
|
// send the stream termination
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
Response::BlocksByRange(None),
|
Response::BlocksByRange(None),
|
||||||
@ -266,7 +266,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -335,7 +335,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
|
|||||||
if message_info.is_some() {
|
if message_info.is_some() {
|
||||||
messages_sent += 1;
|
messages_sent += 1;
|
||||||
let (peer_id, stream_id) = message_info.as_ref().unwrap();
|
let (peer_id, stream_id) = message_info.as_ref().unwrap();
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
*peer_id,
|
*peer_id,
|
||||||
*stream_id,
|
*stream_id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
@ -398,7 +398,7 @@ fn test_blocks_by_range_single_empty_rpc() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -441,14 +441,14 @@ fn test_blocks_by_range_single_empty_rpc() {
|
|||||||
warn!(log, "Receiver got request");
|
warn!(log, "Receiver got request");
|
||||||
|
|
||||||
for _ in 1..=messages_to_send {
|
for _ in 1..=messages_to_send {
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// send the stream termination
|
// send the stream termination
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
Response::BlocksByRange(None),
|
Response::BlocksByRange(None),
|
||||||
@ -513,7 +513,7 @@ fn test_blocks_by_root_chunked_rpc() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -556,7 +556,7 @@ fn test_blocks_by_root_chunked_rpc() {
|
|||||||
debug!(log, "Receiver got request");
|
debug!(log, "Receiver got request");
|
||||||
|
|
||||||
for _ in 1..=messages_to_send {
|
for _ in 1..=messages_to_send {
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
@ -564,7 +564,7 @@ fn test_blocks_by_root_chunked_rpc() {
|
|||||||
debug!(log, "Sending message");
|
debug!(log, "Sending message");
|
||||||
}
|
}
|
||||||
// send the stream termination
|
// send the stream termination
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
Response::BlocksByRange(None),
|
Response::BlocksByRange(None),
|
||||||
@ -634,7 +634,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a STATUS message
|
// Send a STATUS message
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.send_request(
|
sender.swarm.behaviour_mut().send_request(
|
||||||
peer_id,
|
peer_id,
|
||||||
RequestId::Sync(10),
|
RequestId::Sync(10),
|
||||||
rpc_request.clone(),
|
rpc_request.clone(),
|
||||||
@ -703,7 +703,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
|
|||||||
if message_info.is_some() {
|
if message_info.is_some() {
|
||||||
messages_sent += 1;
|
messages_sent += 1;
|
||||||
let (peer_id, stream_id) = message_info.as_ref().unwrap();
|
let (peer_id, stream_id) = message_info.as_ref().unwrap();
|
||||||
receiver.swarm.send_successful_response(
|
receiver.swarm.behaviour_mut().send_successful_response(
|
||||||
*peer_id,
|
*peer_id,
|
||||||
*stream_id,
|
*stream_id,
|
||||||
rpc_response.clone(),
|
rpc_response.clone(),
|
||||||
@ -749,7 +749,7 @@ fn test_goodbye_rpc() {
|
|||||||
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
|
||||||
// Send a goodbye and disconnect
|
// Send a goodbye and disconnect
|
||||||
debug!(log, "Sending RPC");
|
debug!(log, "Sending RPC");
|
||||||
sender.swarm.goodbye_peer(
|
sender.swarm.behaviour_mut().goodbye_peer(
|
||||||
&peer_id,
|
&peer_id,
|
||||||
GoodbyeReason::IrrelevantNetwork,
|
GoodbyeReason::IrrelevantNetwork,
|
||||||
ReportSource::SyncService,
|
ReportSource::SyncService,
|
||||||
|
@ -34,5 +34,4 @@ futures = "0.3.8"
|
|||||||
store = { path = "../store" }
|
store = { path = "../store" }
|
||||||
environment = { path = "../../lighthouse/environment" }
|
environment = { path = "../../lighthouse/environment" }
|
||||||
tree_hash = "0.1.1"
|
tree_hash = "0.1.1"
|
||||||
discv5 = { version = "0.1.0-beta.5", features = ["libp2p"] }
|
|
||||||
sensitive_url = { path = "../../common/sensitive_url" }
|
sensitive_url = { path = "../../common/sensitive_url" }
|
||||||
|
@ -5,11 +5,11 @@ use beacon_chain::{
|
|||||||
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
|
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
|
||||||
BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
|
||||||
};
|
};
|
||||||
use discv5::enr::{CombinedKey, EnrBuilder};
|
|
||||||
use environment::null_logger;
|
use environment::null_logger;
|
||||||
use eth2::Error;
|
use eth2::Error;
|
||||||
use eth2::StatusCode;
|
use eth2::StatusCode;
|
||||||
use eth2::{types::*, BeaconNodeHttpClient, Timeouts};
|
use eth2::{types::*, BeaconNodeHttpClient, Timeouts};
|
||||||
|
use eth2_libp2p::discv5::enr::{CombinedKey, EnrBuilder};
|
||||||
use eth2_libp2p::{
|
use eth2_libp2p::{
|
||||||
rpc::methods::MetaData,
|
rpc::methods::MetaData,
|
||||||
types::{EnrBitfield, SyncState},
|
types::{EnrBitfield, SyncState},
|
||||||
|
@ -15,7 +15,6 @@ slog-term = "2.6.0"
|
|||||||
slog-async = "2.5.0"
|
slog-async = "2.5.0"
|
||||||
logging = { path = "../../common/logging" }
|
logging = { path = "../../common/logging" }
|
||||||
environment = { path = "../../lighthouse/environment" }
|
environment = { path = "../../lighthouse/environment" }
|
||||||
discv5 = { version = "0.1.0-beta.3" }
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
beacon_chain = { path = "../beacon_chain" }
|
beacon_chain = { path = "../beacon_chain" }
|
||||||
|
@ -8,8 +8,8 @@ use beacon_chain::test_utils::{
|
|||||||
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
|
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
|
||||||
};
|
};
|
||||||
use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
|
use beacon_chain::{BeaconChain, MAXIMUM_GOSSIP_CLOCK_DISPARITY};
|
||||||
use discv5::enr::{CombinedKey, EnrBuilder};
|
|
||||||
use environment::{null_logger, Environment, EnvironmentBuilder};
|
use environment::{null_logger, Environment, EnvironmentBuilder};
|
||||||
|
use eth2_libp2p::discv5::enr::{CombinedKey, EnrBuilder};
|
||||||
use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, MessageId, NetworkGlobals, PeerId};
|
use eth2_libp2p::{rpc::methods::MetaData, types::EnrBitfield, MessageId, NetworkGlobals, PeerId};
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
@ -27,6 +27,13 @@ pub fn persist_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
|||||||
store.put_item(&DHT_DB_KEY, &PersistedDht { enrs })
|
store.put_item(&DHT_DB_KEY, &PersistedDht { enrs })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempts to clear any DHT entries.
|
||||||
|
pub fn clear_dht<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||||
|
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||||
|
) -> Result<(), store::Error> {
|
||||||
|
store.hot_db.delete::<PersistedDht>(&DHT_DB_KEY)
|
||||||
|
}
|
||||||
|
|
||||||
/// Wrapper around DHT for persistence to disk.
|
/// Wrapper around DHT for persistence to disk.
|
||||||
pub struct PersistedDht {
|
pub struct PersistedDht {
|
||||||
pub enrs: Vec<Enr>,
|
pub enrs: Vec<Enr>,
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use crate::persisted_dht::{load_dht, persist_dht};
|
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
|
||||||
use crate::router::{Router, RouterMessage};
|
use crate::router::{Router, RouterMessage};
|
||||||
use crate::{
|
use crate::{
|
||||||
attestation_service::{AttServiceMessage, AttestationService},
|
attestation_service::{AttServiceMessage, AttestationService},
|
||||||
@ -178,7 +178,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
"Loading peers into the routing table"; "peers" => enrs_to_load.len()
|
"Loading peers into the routing table"; "peers" => enrs_to_load.len()
|
||||||
);
|
);
|
||||||
for enr in enrs_to_load {
|
for enr in enrs_to_load {
|
||||||
libp2p.swarm.add_enr(enr.clone());
|
libp2p.swarm.behaviour_mut().add_enr(enr.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +251,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
.map(|gauge| gauge.reset());
|
.map(|gauge| gauge.reset());
|
||||||
}
|
}
|
||||||
metrics::update_gossip_metrics::<T::EthSpec>(
|
metrics::update_gossip_metrics::<T::EthSpec>(
|
||||||
&service.libp2p.swarm.gs(),
|
&service.libp2p.swarm.behaviour_mut().gs(),
|
||||||
&service.network_globals,
|
&service.network_globals,
|
||||||
);
|
);
|
||||||
// update sync metrics
|
// update sync metrics
|
||||||
@ -287,8 +287,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
})
|
})
|
||||||
)
|
)
|
||||||
}).unwrap_or(None) {
|
}).unwrap_or(None) {
|
||||||
if (*service.libp2p.swarm)
|
if service.libp2p.swarm.behaviour_mut().update_gossipsub_parameters(active_validators, slot).is_err() {
|
||||||
.update_gossipsub_parameters(active_validators, slot).is_err() {
|
|
||||||
error!(
|
error!(
|
||||||
service.log,
|
service.log,
|
||||||
"Failed to update gossipsub parameters";
|
"Failed to update gossipsub parameters";
|
||||||
@ -314,7 +313,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
service.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
|
service.upnp_mappings = (tcp_socket.map(|s| s.port()), udp_socket.map(|s| s.port()));
|
||||||
// If there is an external TCP port update, modify our local ENR.
|
// If there is an external TCP port update, modify our local ENR.
|
||||||
if let Some(tcp_socket) = tcp_socket {
|
if let Some(tcp_socket) = tcp_socket {
|
||||||
if let Err(e) = service.libp2p.swarm.peer_manager().discovery_mut().update_enr_tcp_port(tcp_socket.port()) {
|
if let Err(e) = service.libp2p.swarm.behaviour_mut().peer_manager().discovery_mut().update_enr_tcp_port(tcp_socket.port()) {
|
||||||
warn!(service.log, "Failed to update ENR"; "error" => e);
|
warn!(service.log, "Failed to update ENR"; "error" => e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -322,7 +321,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
// UPnP mappings
|
// UPnP mappings
|
||||||
if !service.discovery_auto_update {
|
if !service.discovery_auto_update {
|
||||||
if let Some(udp_socket) = udp_socket {
|
if let Some(udp_socket) = udp_socket {
|
||||||
if let Err(e) = service.libp2p.swarm.peer_manager().discovery_mut().update_enr_udp_socket(udp_socket) {
|
if let Err(e) = service.libp2p.swarm.behaviour_mut().peer_manager().discovery_mut().update_enr_udp_socket(udp_socket) {
|
||||||
warn!(service.log, "Failed to update ENR"; "error" => e);
|
warn!(service.log, "Failed to update ENR"; "error" => e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -341,6 +340,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
service
|
service
|
||||||
.libp2p
|
.libp2p
|
||||||
.swarm
|
.swarm
|
||||||
|
.behaviour_mut()
|
||||||
.report_message_validation_result(
|
.report_message_validation_result(
|
||||||
&propagation_source, message_id, validation_result
|
&propagation_source, message_id, validation_result
|
||||||
);
|
);
|
||||||
@ -359,7 +359,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
"topics" => ?topic_kinds
|
"topics" => ?topic_kinds
|
||||||
);
|
);
|
||||||
metrics::expose_publish_metrics(&messages);
|
metrics::expose_publish_metrics(&messages);
|
||||||
service.libp2p.swarm.publish(messages);
|
service.libp2p.swarm.behaviour_mut().publish(messages);
|
||||||
}
|
}
|
||||||
NetworkMessage::ReportPeer { peer_id, action, source } => service.libp2p.report_peer(&peer_id, action, source),
|
NetworkMessage::ReportPeer { peer_id, action, source } => service.libp2p.report_peer(&peer_id, action, source),
|
||||||
NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source),
|
NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source),
|
||||||
@ -375,7 +375,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
|
let already_subscribed = service.network_globals.gossipsub_subscriptions.read().clone();
|
||||||
let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::<std::collections::HashSet<_>>();
|
let already_subscribed = already_subscribed.iter().map(|x| x.kind()).collect::<std::collections::HashSet<_>>();
|
||||||
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) {
|
for topic_kind in eth2_libp2p::types::CORE_TOPICS.iter().filter(|topic| already_subscribed.get(topic).is_none()) {
|
||||||
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
|
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||||
subscribed_topics.push(topic_kind.clone());
|
subscribed_topics.push(topic_kind.clone());
|
||||||
} else {
|
} else {
|
||||||
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||||
@ -387,9 +387,9 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||||
let subnet_id = SubnetId::new(subnet_id);
|
let subnet_id = SubnetId::new(subnet_id);
|
||||||
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
|
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
|
||||||
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
|
if service.libp2p.swarm.behaviour_mut().subscribe_kind(topic_kind.clone()) {
|
||||||
// Update the ENR bitfield.
|
// Update the ENR bitfield.
|
||||||
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
|
||||||
subscribed_topics.push(topic_kind.clone());
|
subscribed_topics.push(topic_kind.clone());
|
||||||
} else {
|
} else {
|
||||||
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
warn!(service.log, "Could not subscribe to topic"; "topic" => %topic_kind);
|
||||||
@ -407,19 +407,19 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
Some(attestation_service_message) = service.attestation_service.next() => {
|
Some(attestation_service_message) = service.attestation_service.next() => {
|
||||||
match attestation_service_message {
|
match attestation_service_message {
|
||||||
AttServiceMessage::Subscribe(subnet_id) => {
|
AttServiceMessage::Subscribe(subnet_id) => {
|
||||||
service.libp2p.swarm.subscribe_to_subnet(subnet_id);
|
service.libp2p.swarm.behaviour_mut().subscribe_to_subnet(subnet_id);
|
||||||
}
|
}
|
||||||
AttServiceMessage::Unsubscribe(subnet_id) => {
|
AttServiceMessage::Unsubscribe(subnet_id) => {
|
||||||
service.libp2p.swarm.unsubscribe_from_subnet(subnet_id);
|
service.libp2p.swarm.behaviour_mut().unsubscribe_from_subnet(subnet_id);
|
||||||
}
|
}
|
||||||
AttServiceMessage::EnrAdd(subnet_id) => {
|
AttServiceMessage::EnrAdd(subnet_id) => {
|
||||||
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, true);
|
||||||
}
|
}
|
||||||
AttServiceMessage::EnrRemove(subnet_id) => {
|
AttServiceMessage::EnrRemove(subnet_id) => {
|
||||||
service.libp2p.swarm.update_enr_subnet(subnet_id, false);
|
service.libp2p.swarm.behaviour_mut().update_enr_subnet(subnet_id, false);
|
||||||
}
|
}
|
||||||
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
AttServiceMessage::DiscoverPeers(subnets_to_discover) => {
|
||||||
service.libp2p.swarm.discover_subnet_peers(subnets_to_discover);
|
service.libp2p.swarm.behaviour_mut().discover_subnet_peers(subnets_to_discover);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -541,6 +541,7 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
service
|
service
|
||||||
.libp2p
|
.libp2p
|
||||||
.swarm
|
.swarm
|
||||||
|
.behaviour_mut()
|
||||||
.update_fork_version(service.beacon_chain.enr_fork_id());
|
.update_fork_version(service.beacon_chain.enr_fork_id());
|
||||||
service.next_fork_update = next_fork_delay(&service.beacon_chain);
|
service.next_fork_update = next_fork_delay(&service.beacon_chain);
|
||||||
}
|
}
|
||||||
@ -566,12 +567,16 @@ fn next_fork_delay<T: BeaconChainTypes>(
|
|||||||
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
impl<T: BeaconChainTypes> Drop for NetworkService<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// network thread is terminating
|
// network thread is terminating
|
||||||
let enrs = self.libp2p.swarm.enr_entries();
|
let enrs = self.libp2p.swarm.behaviour_mut().enr_entries();
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"Persisting DHT to store";
|
"Persisting DHT to store";
|
||||||
"Number of peers" => enrs.len(),
|
"Number of peers" => enrs.len(),
|
||||||
);
|
);
|
||||||
|
if let Err(e) = clear_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone()) {
|
||||||
|
error!(self.log, "Failed to clear old DHT entries"; "error" => ?e);
|
||||||
|
}
|
||||||
|
// Still try to update new entries
|
||||||
match persist_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone(), enrs) {
|
match persist_dht::<T::EthSpec, T::HotStore, T::ColdStore>(self.store.clone(), enrs) {
|
||||||
Err(e) => error!(
|
Err(e) => error!(
|
||||||
self.log,
|
self.log,
|
||||||
|
@ -19,4 +19,4 @@ serde_yaml = "0.8.13"
|
|||||||
types = { path = "../../consensus/types"}
|
types = { path = "../../consensus/types"}
|
||||||
eth2_ssz = "0.1.2"
|
eth2_ssz = "0.1.2"
|
||||||
eth2_config = { path = "../eth2_config"}
|
eth2_config = { path = "../eth2_config"}
|
||||||
enr = { version = "0.5.0", features = ["ed25519", "k256"] }
|
enr = { version = "0.5.1", features = ["ed25519", "k256"] }
|
||||||
|
Loading…
Reference in New Issue
Block a user