Persist/load DHT on shutdown/startup (#659)

* Store dht enrs on shutdown

* Load enrs on startup and add tests

* Remove enr_entries from behavior

* Move all dht persisting logic to `NetworkService`

* Move `PersistedDht` from eth2-libp2p to network crate

* Add test to confirm dht persistence

* Add logging

* Remove extra call to beacon_chain persist

* Expose only mutable `add_enr` method from behaviour

* Fix tests

* Fix merge errors
This commit is contained in:
Pawan Dhananjay 2020-01-23 12:46:11 +05:30 committed by Age Manning
parent 89f05e4a4f
commit 23a35c3767
10 changed files with 242 additions and 10 deletions

2
Cargo.lock generated
View File

@ -2665,9 +2665,11 @@ dependencies = [
"eth2_ssz 0.1.2",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"genesis 0.1.0",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slog 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sloggers 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -61,11 +61,3 @@ impl<T: BeaconChainTypes> Client<T> {
self.libp2p_network.as_ref().map(|n| n.local_enr())
}
}
impl<T: BeaconChainTypes> Drop for Client<T> {
fn drop(&mut self) {
if let Some(beacon_chain) = &self.beacon_chain {
let _result = beacon_chain.persist();
}
}
}

View File

@ -3,6 +3,7 @@ use crate::rpc::{RPCEvent, RPCMessage, RPC};
use crate::GossipTopic;
use crate::{error, NetworkConfig};
use crate::{Topic, TopicHash};
use enr::Enr;
use futures::prelude::*;
use libp2p::{
core::identity::Keypair,
@ -254,6 +255,16 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn peer_unbanned(&mut self, peer_id: &PeerId) {
self.discovery.peer_unbanned(peer_id);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> impl Iterator<Item = &Enr> {
self.discovery.enr_entries()
}
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
self.discovery.add_enr(enr);
}
}
/// The types of events than can be obtained from polling the behaviour.

View File

@ -148,6 +148,11 @@ impl<TSubstream> Discovery<TSubstream> {
self.banned_peers.remove(peer_id);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> impl Iterator<Item = &Enr> {
self.discovery.enr_entries()
}
/// Search for new peers using the underlying discovery mechanism.
fn find_peers(&mut self) {
// pick a random NodeId

View File

@ -6,6 +6,7 @@ edition = "2018"
[dev-dependencies]
sloggers = "0.3.4"
genesis = { path = "../genesis" }
[dependencies]
beacon_chain = { path = "../beacon_chain" }
@ -24,3 +25,4 @@ smallvec = "1.0.0"
# TODO: Remove rand crate for mainnet
rand = "0.7.2"
fnv = "1.0.6"
rlp = "0.4.3"

View File

@ -2,6 +2,7 @@
pub mod error;
pub mod message_handler;
pub mod message_processor;
pub mod persisted_dht;
pub mod service;
pub mod sync;

View File

@ -0,0 +1,51 @@
use eth2_libp2p::Enr;
use rlp;
use store::{DBColumn, Error as StoreError, SimpleStoreItem};
/// 32-byte key for accessing the `DhtEnrs`.
pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE";
/// Wrapper around dht for persistence to disk.
pub struct PersistedDht {
pub enrs: Vec<Enr>,
}
impl SimpleStoreItem for PersistedDht {
fn db_column() -> DBColumn {
DBColumn::DhtEnrs
}
fn as_store_bytes(&self) -> Vec<u8> {
rlp::encode_list(&self.enrs)
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
let rlp = rlp::Rlp::new(bytes);
let enrs: Vec<Enr> = rlp
.as_list()
.map_err(|e| StoreError::RlpError(format!("{}", e)))?;
Ok(PersistedDht { enrs })
}
}
#[cfg(test)]
mod tests {
use super::*;
use eth2_libp2p::Enr;
use std::str::FromStr;
use std::sync::Arc;
use store::{MemoryStore, Store};
use types::Hash256;
use types::MinimalEthSpec;
#[test]
fn test_persisted_dht() {
let store = Arc::new(MemoryStore::<MinimalEthSpec>::open());
let enrs = vec![Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap()];
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
store
.put(&key, &PersistedDht { enrs: enrs.clone() })
.unwrap();
let dht: PersistedDht = store.get(&key).unwrap().unwrap();
assert_eq!(dht.enrs, enrs);
}
}

View File

@ -1,5 +1,6 @@
use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::persisted_dht::{PersistedDht, DHT_DB_KEY};
use crate::NetworkConfig;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use core::marker::PhantomData;
@ -9,10 +10,12 @@ use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*;
use futures::Stream;
use parking_lot::Mutex;
use slog::{debug, info, trace};
use slog::{debug, error, info, trace};
use std::sync::Arc;
use store::Store;
use tokio::runtime::TaskExecutor;
use tokio::sync::{mpsc, oneshot};
use types::Hash256;
/// The time in seconds that a peer will be banned and prevented from reconnecting.
const BAN_PEER_TIMEOUT: u64 = 30;
@ -21,6 +24,8 @@ const BAN_PEER_TIMEOUT: u64 = 30;
pub struct Service<T: BeaconChainTypes> {
libp2p_service: Arc<Mutex<LibP2PService>>,
libp2p_port: u16,
store: Arc<T::Store>,
log: slog::Logger,
_libp2p_exit: oneshot::Sender<()>,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_phantom: PhantomData<T>,
@ -35,6 +40,8 @@ impl<T: BeaconChainTypes> Service<T> {
) -> error::Result<(Arc<Self>, mpsc::UnboundedSender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
// Get a reference to the beacon chain store
let store = beacon_chain.store.clone();
// launch message handler thread
let message_handler_send = MessageHandler::spawn(
beacon_chain,
@ -49,17 +56,32 @@ impl<T: BeaconChainTypes> Service<T> {
network_log.clone(),
)?));
// Load DHT from store
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
let enrs: Vec<Enr> = match store.get(&key) {
Ok(Some(p)) => {
let p: PersistedDht = p;
p.enrs
}
_ => Vec::new(),
};
for enr in enrs {
libp2p_service.lock().swarm.add_enr(enr);
}
let libp2p_exit = spawn_service(
libp2p_service.clone(),
network_recv,
message_handler_send,
executor,
network_log,
network_log.clone(),
config.propagation_percentage,
)?;
let network_service = Service {
libp2p_service,
libp2p_port: config.libp2p_port,
store,
log: network_log,
_libp2p_exit: libp2p_exit,
_network_send: network_send.clone(),
_phantom: PhantomData,
@ -117,6 +139,25 @@ impl<T: BeaconChainTypes> Service<T> {
pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService>> {
self.libp2p_service.clone()
}
/// Attempt to persist the enrs in the DHT to `self.store`.
pub fn persist_dht(&self) -> Result<(), store::Error> {
let enrs: Vec<Enr> = self
.libp2p_service()
.lock()
.swarm
.enr_entries()
.map(|x| x.clone())
.collect();
info!(
self.log,
"Persisting DHT to store";
"Number of peers" => format!("{}", enrs.len()),
);
let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes());
self.store.put(&key, &PersistedDht { enrs })?;
Ok(())
}
}
fn spawn_service(
@ -307,3 +348,127 @@ pub enum NetworkMessage {
/// Disconnect and bans a peer id.
Disconnect { peer_id: PeerId },
}
impl<T: BeaconChainTypes> Drop for Service<T> {
fn drop(&mut self) {
if let Err(e) = self.persist_dht() {
error!(
self.log,
"Failed to persist DHT on drop";
"error" => format!("{:?}", e)
)
} else {
info!(
self.log,
"Saved DHT state";
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use beacon_chain::builder::BeaconChainBuilder;
use eth2_libp2p::Enr;
use genesis::{generate_deterministic_keypairs, interop_genesis_state};
use slog::Logger;
use sloggers::{null::NullLoggerBuilder, Build};
use std::str::FromStr;
use store::{migrate::NullMigrator, SimpleDiskStore};
use tokio::runtime::Runtime;
use types::{EthSpec, MinimalEthSpec};
fn get_logger() -> Logger {
let builder = NullLoggerBuilder;
builder.build().expect("should build logger")
}
#[test]
fn test_dht_persistence() {
// Create new LevelDB store
let path = "/tmp";
let store = Arc::new(SimpleDiskStore::open(&std::path::PathBuf::from(path)).unwrap());
// Create a `BeaconChain` object to pass to `Service`
let validator_count = 8;
let genesis_time = 13371337;
let log = get_logger();
let spec = MinimalEthSpec::default_spec();
let genesis_state = interop_genesis_state(
&generate_deterministic_keypairs(validator_count),
genesis_time,
&spec,
)
.expect("should create interop genesis state");
let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(store)
.store_migrator(NullMigrator)
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
.expect("should build the dummy eth1 backend")
.null_event_handler()
.testing_slot_clock(std::time::Duration::from_secs(1))
.expect("should configure testing slot clock")
.reduced_tree_fork_choice()
.expect("should add fork choice to builder")
.build()
.expect("should build");
let beacon_chain = Arc::new(chain);
let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap();
let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap();
let enrs = vec![enr1, enr2];
let runtime = Runtime::new().unwrap();
// Create new network service
let (service, _) = Service::new(
beacon_chain.clone(),
&NetworkConfig::default(),
&runtime.executor(),
log.clone(),
)
.unwrap();
// Add enrs manually to dht
for enr in enrs.iter() {
service.libp2p_service().lock().swarm.add_enr(enr.clone());
}
assert_eq!(
enrs.len(),
service
.libp2p_service()
.lock()
.swarm
.enr_entries()
.collect::<Vec<_>>()
.len(),
"DHT should have 2 enrs"
);
// Drop the service value
std::mem::drop(service);
// Recover the network service from beacon chain store and fresh network config
let (recovered_service, _) = Service::new(
beacon_chain,
&NetworkConfig::default(),
&runtime.executor(),
log.clone(),
)
.unwrap();
assert_eq!(
enrs.len(),
recovered_service
.libp2p_service()
.lock()
.swarm
.enr_entries()
.collect::<Vec<_>>()
.len(),
"Recovered DHT should have 2 enrs"
);
}
}

View File

@ -11,6 +11,7 @@ pub enum Error {
PartialBeaconStateError,
HotColdDBError(HotColdDBError),
DBError { message: String },
RlpError(String),
}
impl From<DecodeError> for Error {

View File

@ -171,6 +171,7 @@ pub enum DBColumn {
BeaconStateRoots,
BeaconHistoricalRoots,
BeaconRandaoMixes,
DhtEnrs,
}
impl Into<&'static str> for DBColumn {
@ -187,6 +188,7 @@ impl Into<&'static str> for DBColumn {
DBColumn::BeaconStateRoots => "bsr",
DBColumn::BeaconHistoricalRoots => "bhr",
DBColumn::BeaconRandaoMixes => "brm",
DBColumn::DhtEnrs => "dht",
}
}
}