lighthouse/slasher/service/src/service.rs

338 lines
12 KiB
Rust
Raw Normal View History

use beacon_chain::{
observed_operations::ObservationOutcome, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use directory::size_of_dir;
Rename eth2_libp2p to lighthouse_network (#2702) ## Description The `eth2_libp2p` crate was originally named and designed to incorporate a simple libp2p integration into lighthouse. Since its origins the crates purpose has expanded dramatically. It now houses a lot more sophistication that is specific to lighthouse and no longer just a libp2p integration. As of this writing it currently houses the following high-level lighthouse-specific logic: - Lighthouse's implementation of the eth2 RPC protocol and specific encodings/decodings - Integration and handling of ENRs with respect to libp2p and eth2 - Lighthouse's discovery logic, its integration with discv5 and logic about searching and handling peers. - Lighthouse's peer manager - This is a large module handling various aspects of Lighthouse's network, such as peer scoring, handling pings and metadata, connection maintenance and recording, etc. - Lighthouse's peer database - This is a collection of information stored for each individual peer which is specific to lighthouse. We store connection state, sync state, last seen ips and scores etc. The data stored for each peer is designed for various elements of the lighthouse code base such as syncing and the http api. - Gossipsub scoring - This stores a collection of gossipsub 1.1 scoring mechanisms that are continuously analyssed and updated based on the ethereum 2 networks and how Lighthouse performs on these networks. - Lighthouse specific types for managing gossipsub topics, sync status and ENR fields - Lighthouse's network HTTP API metrics - A collection of metrics for lighthouse network monitoring - Lighthouse's custom configuration of all networking protocols, RPC, gossipsub, discovery, identify and libp2p. Therefore it makes sense to rename the crate to be more akin to its current purposes, simply that it manages the majority of Lighthouse's network stack. This PR renames this crate to `lighthouse_network` Co-authored-by: Paul Hauner <paul@paulhauner.com>
2021-10-19 00:30:39 +00:00
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use slasher::{
metrics::{self, SLASHER_DATABASE_SIZE, SLASHER_RUN_TIME},
Slasher,
};
use slog::{debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use state_processing::{
per_block_processing::errors::{
AttesterSlashingInvalid, BlockOperationError, ProposerSlashingInvalid,
},
VerifyOperation,
};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{interval_at, Duration, Instant};
use types::{AttesterSlashing, Epoch, EthSpec, ProposerSlashing};
pub struct SlasherService<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
network_sender: UnboundedSender<NetworkMessage<T::EthSpec>>,
}
impl<T: BeaconChainTypes> SlasherService<T> {
/// Create a new service but don't start any tasks yet.
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
network_sender: UnboundedSender<NetworkMessage<T::EthSpec>>,
) -> Self {
Self {
beacon_chain,
network_sender,
}
}
/// Start the slasher service tasks on the `executor`.
pub fn run(&self, executor: &TaskExecutor) -> Result<(), String> {
let slasher = self
.beacon_chain
.slasher
.clone()
.ok_or("No slasher is configured")?;
let log = slasher.log().clone();
info!(log, "Starting slasher"; "broadcast" => slasher.config().broadcast);
// Buffer just a single message in the channel. If the receiver is still processing, we
// don't need to burden them with more work (we can wait).
let (notif_sender, notif_receiver) = sync_channel(1);
let update_period = slasher.config().update_period;
let slot_offset = slasher.config().slot_offset;
let beacon_chain = self.beacon_chain.clone();
let network_sender = self.network_sender.clone();
executor.spawn(
Self::run_notifier(
beacon_chain.clone(),
update_period,
slot_offset,
notif_sender,
log,
),
"slasher_server_notifier",
);
executor.spawn_blocking(
|| Self::run_processor(beacon_chain, slasher, notif_receiver, network_sender),
"slasher_server_processor",
);
Ok(())
}
/// Run the async notifier which periodically prompts the processor to run.
async fn run_notifier(
beacon_chain: Arc<BeaconChain<T>>,
update_period: u64,
slot_offset: f64,
notif_sender: SyncSender<Epoch>,
log: Logger,
) {
let slot_offset = Duration::from_secs_f64(slot_offset);
let start_instant =
if let Some(duration_to_next_slot) = beacon_chain.slot_clock.duration_to_next_slot() {
Instant::now() + duration_to_next_slot + slot_offset
} else {
error!(log, "Error aligning slasher to slot clock");
Instant::now()
};
let mut interval = interval_at(start_instant, Duration::from_secs(update_period));
Update to tokio 1.1 (#2172) ## Issue Addressed resolves #2129 resolves #2099 addresses some of #1712 unblocks #2076 unblocks #2153 ## Proposed Changes - Updates all the dependencies mentioned in #2129, except for web3. They haven't merged their tokio 1.0 update because they are waiting on some dependencies of their own. Since we only use web3 in tests, I think updating it in a separate issue is fine. If they are able to merge soon though, I can update in this PR. - Updates `tokio_util` to 0.6.2 and `bytes` to 1.0.1. - We haven't made a discv5 release since merging tokio 1.0 updates so I'm using a commit rather than release atm. **Edit:** I think we should merge an update of `tokio_util` to 0.6.2 into discv5 before this release because it has panic fixes in `DelayQueue` --> PR in discv5: https://github.com/sigp/discv5/pull/58 ## Additional Info tokio 1.0 changes that required some changes in lighthouse: - `interval.next().await.is_some()` -> `interval.tick().await` - `sleep` future is now `!Unpin` -> https://github.com/tokio-rs/tokio/issues/3028 - `try_recv` has been temporarily removed from `mpsc` -> https://github.com/tokio-rs/tokio/issues/3350 - stream features have moved to `tokio-stream` and `broadcast::Receiver::into_stream()` has been temporarily removed -> `https://github.com/tokio-rs/tokio/issues/2870 - I've copied over the `BroadcastStream` wrapper from this PR, but can update to use `tokio-stream` once it's merged https://github.com/tokio-rs/tokio/pull/3384 Co-authored-by: realbigsean <seananderson33@gmail.com>
2021-02-10 23:29:49 +00:00
loop {
interval.tick().await;
if let Some(current_slot) = beacon_chain.slot_clock.now() {
let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
if let Err(TrySendError::Disconnected(_)) = notif_sender.try_send(current_epoch) {
break;
}
} else {
trace!(log, "Slasher has nothing to do: we are pre-genesis");
}
}
}
/// Run the blocking task that performs work.
fn run_processor(
beacon_chain: Arc<BeaconChain<T>>,
slasher: Arc<Slasher<T::EthSpec>>,
notif_receiver: Receiver<Epoch>,
network_sender: UnboundedSender<NetworkMessage<T::EthSpec>>,
) {
let log = slasher.log();
while let Ok(current_epoch) = notif_receiver.recv() {
let t = Instant::now();
let batch_timer = metrics::start_timer(&SLASHER_RUN_TIME);
let stats = match slasher.process_queued(current_epoch) {
Ok(stats) => Some(stats),
Err(e) => {
error!(
log,
"Error during scheduled slasher processing";
"epoch" => current_epoch,
"error" => ?e,
);
None
}
};
drop(batch_timer);
// Prune the database, even in the case where batch processing failed.
// If the database is full then pruning could help to free it up.
if let Err(e) = slasher.prune_database(current_epoch) {
error!(
log,
"Error during slasher database pruning";
"epoch" => current_epoch,
"error" => ?e,
);
continue;
};
// Provide slashings to the beacon chain, and optionally publish them.
Self::process_slashings(&beacon_chain, &slasher, &network_sender);
let database_size = size_of_dir(&slasher.config().database_path);
metrics::set_gauge(&SLASHER_DATABASE_SIZE, database_size as i64);
if let Some(stats) = stats {
debug!(
log,
"Completed slasher update";
"epoch" => current_epoch,
"time_taken" => format!("{}ms", t.elapsed().as_millis()),
"num_attestations" => stats.attestation_stats.num_processed,
"num_blocks" => stats.block_stats.num_processed,
);
}
}
}
/// Push any slashings found to the beacon chain, optionally publishing them on the network.
fn process_slashings(
beacon_chain: &BeaconChain<T>,
slasher: &Slasher<T::EthSpec>,
network_sender: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) {
Self::process_attester_slashings(beacon_chain, slasher, network_sender);
Self::process_proposer_slashings(beacon_chain, slasher, network_sender);
}
fn process_attester_slashings(
beacon_chain: &BeaconChain<T>,
slasher: &Slasher<T::EthSpec>,
network_sender: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) {
let log = slasher.log();
let attester_slashings = slasher.get_attester_slashings();
for slashing in attester_slashings {
// Verify slashing signature.
let verified_slashing = match beacon_chain.with_head(|head| {
Ok::<_, BeaconChainError>(
slashing
.clone()
.validate(&head.beacon_state, &beacon_chain.spec)?,
)
}) {
Ok(verified) => verified,
Err(BeaconChainError::AttesterSlashingValidationError(
BlockOperationError::Invalid(AttesterSlashingInvalid::NoSlashableIndices),
)) => {
debug!(
log,
"Skipping attester slashing for slashed validators";
"slashing" => ?slashing,
);
continue;
}
Err(e) => {
warn!(
log,
"Attester slashing produced is invalid";
"error" => ?e,
"slashing" => ?slashing,
);
continue;
}
};
// Add to local op pool.
if let Err(e) = beacon_chain.import_attester_slashing(verified_slashing) {
error!(
log,
"Beacon chain refused attester slashing";
"error" => ?e,
"slashing" => ?slashing,
);
}
// Publish to the network if broadcast is enabled.
if slasher.config().broadcast {
if let Err(e) =
Self::publish_attester_slashing(beacon_chain, network_sender, slashing)
{
debug!(
log,
"Unable to publish attester slashing";
"error" => e,
);
}
}
}
}
fn process_proposer_slashings(
beacon_chain: &BeaconChain<T>,
slasher: &Slasher<T::EthSpec>,
network_sender: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) {
let log = slasher.log();
let proposer_slashings = slasher.get_proposer_slashings();
for slashing in proposer_slashings {
let verified_slashing = match beacon_chain.with_head(|head| {
Ok(slashing
.clone()
.validate(&head.beacon_state, &beacon_chain.spec)?)
}) {
Ok(verified) => verified,
Err(BeaconChainError::ProposerSlashingValidationError(
BlockOperationError::Invalid(ProposerSlashingInvalid::ProposerNotSlashable(
index,
)),
)) => {
debug!(
log,
"Skipping proposer slashing for slashed validator";
"validator_index" => index,
);
continue;
}
Err(e) => {
error!(
log,
"Proposer slashing produced is invalid";
"error" => ?e,
"slashing" => ?slashing,
);
continue;
}
};
beacon_chain.import_proposer_slashing(verified_slashing);
if slasher.config().broadcast {
if let Err(e) =
Self::publish_proposer_slashing(beacon_chain, network_sender, slashing)
{
debug!(
log,
"Unable to publish proposer slashing";
"error" => e,
);
}
}
}
}
fn publish_attester_slashing(
beacon_chain: &BeaconChain<T>,
network_sender: &UnboundedSender<NetworkMessage<T::EthSpec>>,
slashing: AttesterSlashing<T::EthSpec>,
) -> Result<(), String> {
let outcome = beacon_chain
.verify_attester_slashing_for_gossip(slashing)
.map_err(|e| format!("gossip verification error: {:?}", e))?;
if let ObservationOutcome::New(slashing) = outcome {
network_sender
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::AttesterSlashing(Box::new(
slashing.into_inner(),
))],
})
.map_err(|e| format!("network error: {:?}", e))?;
}
Ok(())
}
fn publish_proposer_slashing(
beacon_chain: &BeaconChain<T>,
network_sender: &UnboundedSender<NetworkMessage<T::EthSpec>>,
slashing: ProposerSlashing,
) -> Result<(), String> {
let outcome = beacon_chain
.verify_proposer_slashing_for_gossip(slashing)
.map_err(|e| format!("gossip verification error: {:?}", e))?;
if let ObservationOutcome::New(slashing) = outcome {
network_sender
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::ProposerSlashing(Box::new(
slashing.into_inner(),
))],
})
.map_err(|e| format!("network error: {:?}", e))?;
}
Ok(())
}
}