Add a flag for storing invalid blocks (#4194)

## Issue Addressed

NA

## Proposed Changes

Adds a flag to store invalid blocks on disk for teh debugz. Only *some* invalid blocks are stored, those which:

- Were received via gossip (rather than RPC, for instance)
    - This keeps things simple to start with and should capture most blocks.
- Passed gossip verification
    - This reduces the ability for random people to fill up our disk. A proposer signature is required to write something to disk.

## Additional Info

It's possible that we'll store blocks that aren't necessarily invalid, but we had an internal error during verification. Those blocks seem like they might be useful sometimes.
This commit is contained in:
Paul Hauner 2023-05-15 07:22:03 +00:00
parent 8a3eb4df9c
commit 714ed53839
9 changed files with 173 additions and 27 deletions

View File

@ -145,6 +145,9 @@ pub struct Config {
/// Configuration for the outbound rate limiter (requests made by this node). /// Configuration for the outbound rate limiter (requests made by this node).
pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>, pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
/// Configures if/where invalid blocks should be stored.
pub invalid_block_storage: Option<PathBuf>,
} }
impl Config { impl Config {
@ -329,6 +332,7 @@ impl Default for Config {
metrics_enabled: false, metrics_enabled: false,
enable_light_client_server: false, enable_light_client_server: false,
outbound_rate_limiter_config: None, outbound_rate_limiter_config: None,
invalid_block_storage: None,
} }
} }
} }

View File

@ -54,6 +54,7 @@ use logging::TimeLatch;
use slog::{crit, debug, error, trace, warn, Logger}; use slog::{crit, debug, error, trace, warn, Logger};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::Context; use std::task::Context;
@ -982,6 +983,13 @@ impl<T: BeaconChainTypes> Stream for InboundEvents<T> {
} }
} }
/// Defines if and where we will store the SSZ files of invalid blocks.
#[derive(Clone)]
pub enum InvalidBlockStorage {
Enabled(PathBuf),
Disabled,
}
/// A mutli-threaded processor for messages received on the network /// A mutli-threaded processor for messages received on the network
/// that need to be processed by the `BeaconChain` /// that need to be processed by the `BeaconChain`
/// ///
@ -995,6 +1003,7 @@ pub struct BeaconProcessor<T: BeaconChainTypes> {
pub max_workers: usize, pub max_workers: usize,
pub current_workers: usize, pub current_workers: usize,
pub importing_blocks: DuplicateCache, pub importing_blocks: DuplicateCache,
pub invalid_block_storage: InvalidBlockStorage,
pub log: Logger, pub log: Logger,
} }
@ -1676,19 +1685,23 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_client, peer_client,
block, block,
seen_timestamp, seen_timestamp,
} => task_spawner.spawn_async(async move { } => {
worker let invalid_block_storage = self.invalid_block_storage.clone();
.process_gossip_block( task_spawner.spawn_async(async move {
message_id, worker
peer_id, .process_gossip_block(
peer_client, message_id,
block, peer_id,
work_reprocessing_tx, peer_client,
duplicate_cache, block,
seen_timestamp, work_reprocessing_tx,
) duplicate_cache,
.await invalid_block_storage,
}), seen_timestamp,
)
.await
})
}
/* /*
* Import for blocks that we received earlier than their intended slot. * Import for blocks that we received earlier than their intended slot.
*/ */
@ -1696,12 +1709,16 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
peer_id, peer_id,
block, block,
seen_timestamp, seen_timestamp,
} => task_spawner.spawn_async(worker.process_gossip_verified_block( } => {
peer_id, let invalid_block_storage = self.invalid_block_storage.clone();
*block, task_spawner.spawn_async(worker.process_gossip_verified_block(
work_reprocessing_tx, peer_id,
seen_timestamp, *block,
)), work_reprocessing_tx,
invalid_block_storage,
seen_timestamp,
))
}
/* /*
* Voluntary exits received on gossip. * Voluntary exits received on gossip.
*/ */

View File

@ -203,6 +203,7 @@ impl TestRig {
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
importing_blocks: duplicate_cache.clone(), importing_blocks: duplicate_cache.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
log: log.clone(), log: log.clone(),
} }
.spawn_manager(beacon_processor_rx, Some(work_journal_tx)); .spawn_manager(beacon_processor_rx, Some(work_journal_tx));

View File

@ -13,9 +13,12 @@ use beacon_chain::{
}; };
use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource};
use operation_pool::ReceivedPreCapella; use operation_pool::ReceivedPreCapella;
use slog::{crit, debug, error, info, trace, warn}; use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use ssz::Encode; use ssz::Encode;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError; use store::hot_cold_store::HotColdDBError;
@ -34,7 +37,7 @@ use super::{
}, },
Worker, Worker,
}; };
use crate::beacon_processor::DuplicateCache; use crate::beacon_processor::{DuplicateCache, InvalidBlockStorage};
/// Set to `true` to introduce stricter penalties for peers who send some types of late consensus /// Set to `true` to introduce stricter penalties for peers who send some types of late consensus
/// messages. /// messages.
@ -663,6 +666,7 @@ impl<T: BeaconChainTypes> Worker<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>, block: Arc<SignedBeaconBlock<T::EthSpec>>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
duplicate_cache: DuplicateCache, duplicate_cache: DuplicateCache,
invalid_block_storage: InvalidBlockStorage,
seen_duration: Duration, seen_duration: Duration,
) { ) {
if let Some(gossip_verified_block) = self if let Some(gossip_verified_block) = self
@ -683,6 +687,7 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id, peer_id,
gossip_verified_block, gossip_verified_block,
reprocess_tx, reprocess_tx,
invalid_block_storage,
seen_duration, seen_duration,
) )
.await; .await;
@ -935,13 +940,14 @@ impl<T: BeaconChainTypes> Worker<T> {
peer_id: PeerId, peer_id: PeerId,
verified_block: GossipVerifiedBlock<T>, verified_block: GossipVerifiedBlock<T>,
reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>, reprocess_tx: mpsc::Sender<ReprocessQueueMessage<T>>,
invalid_block_storage: InvalidBlockStorage,
// This value is not used presently, but it might come in handy for debugging. // This value is not used presently, but it might come in handy for debugging.
_seen_duration: Duration, _seen_duration: Duration,
) { ) {
let block: Arc<_> = verified_block.block.clone(); let block: Arc<_> = verified_block.block.clone();
let block_root = verified_block.block_root; let block_root = verified_block.block_root;
match self let result = self
.chain .chain
.process_block( .process_block(
block_root, block_root,
@ -949,14 +955,15 @@ impl<T: BeaconChainTypes> Worker<T> {
CountUnrealized::True, CountUnrealized::True,
NotifyExecutionLayer::Yes, NotifyExecutionLayer::Yes,
) )
.await .await;
{
match &result {
Ok(block_root) => { Ok(block_root) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
if reprocess_tx if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported { .try_send(ReprocessQueueMessage::BlockImported {
block_root, block_root: *block_root,
parent_root: block.message().parent_root(), parent_root: block.message().parent_root(),
}) })
.is_err() .is_err()
@ -986,7 +993,11 @@ impl<T: BeaconChainTypes> Worker<T> {
"Block with unknown parent attempted to be processed"; "Block with unknown parent attempted to be processed";
"peer_id" => %peer_id "peer_id" => %peer_id
); );
self.send_sync_message(SyncMessage::UnknownBlock(peer_id, block, block_root)); self.send_sync_message(SyncMessage::UnknownBlock(
peer_id,
block.clone(),
block_root,
));
} }
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!( debug!(
@ -1015,6 +1026,16 @@ impl<T: BeaconChainTypes> Worker<T> {
); );
} }
}; };
if let Err(e) = &result {
self.maybe_store_invalid_block(
&invalid_block_storage,
block_root,
&block,
e,
&self.log,
);
}
} }
pub fn process_gossip_voluntary_exit( pub fn process_gossip_voluntary_exit(
@ -2486,4 +2507,62 @@ impl<T: BeaconChainTypes> Worker<T> {
self.propagate_if_timely(is_timely, message_id, peer_id) self.propagate_if_timely(is_timely, message_id, peer_id)
} }
/// Stores a block as a SSZ file, if and where `invalid_block_storage` dictates.
fn maybe_store_invalid_block(
&self,
invalid_block_storage: &InvalidBlockStorage,
block_root: Hash256,
block: &SignedBeaconBlock<T::EthSpec>,
error: &BlockError<T::EthSpec>,
log: &Logger,
) {
if let InvalidBlockStorage::Enabled(base_dir) = invalid_block_storage {
let block_path = base_dir.join(format!("{}_{:?}.ssz", block.slot(), block_root));
let error_path = base_dir.join(format!("{}_{:?}.error", block.slot(), block_root));
let write_file = |path: PathBuf, bytes: &[u8]| {
// No need to write the same file twice. For the error file,
// this means that we'll remember the first error message but
// forget the rest.
if path.exists() {
return;
}
// Write to the file.
let write_result = fs::OpenOptions::new()
// Only succeed if the file doesn't already exist. We should
// have checked for this earlier.
.create_new(true)
.write(true)
.open(&path)
.map_err(|e| format!("Failed to open file: {:?}", e))
.map(|mut file| {
file.write_all(bytes)
.map_err(|e| format!("Failed to write file: {:?}", e))
});
if let Err(e) = write_result {
error!(
log,
"Failed to store invalid block/error";
"error" => e,
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
} else {
info!(
log,
"Stored invalid block/error ";
"path" => ?path,
"root" => ?block_root,
"slot" => block.slot(),
)
}
};
write_file(block_path, &block.as_ssz_bytes());
write_file(error_path, error.to_string().as_bytes());
}
}
} }

View File

@ -6,7 +6,7 @@
#![allow(clippy::unit_arg)] #![allow(clippy::unit_arg)]
use crate::beacon_processor::{ use crate::beacon_processor::{
BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, BeaconProcessor, InvalidBlockStorage, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN,
}; };
use crate::error; use crate::error;
use crate::service::{NetworkMessage, RequestId}; use crate::service::{NetworkMessage, RequestId};
@ -81,6 +81,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
executor: task_executor::TaskExecutor, executor: task_executor::TaskExecutor,
invalid_block_storage: InvalidBlockStorage,
log: slog::Logger, log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> { ) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router")); let message_handler_log = log.new(o!("service"=> "router"));
@ -112,6 +113,7 @@ impl<T: BeaconChainTypes> Router<T> {
max_workers: cmp::max(1, num_cpus::get()), max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0, current_workers: 0,
importing_blocks: Default::default(), importing_blocks: Default::default(),
invalid_block_storage,
log: log.clone(), log: log.clone(),
} }
.spawn_manager(beacon_processor_receive, None); .spawn_manager(beacon_processor_receive, None);

View File

@ -1,4 +1,5 @@
use super::sync::manager::RequestId as SyncId; use super::sync::manager::RequestId as SyncId;
use crate::beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage}; use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService; use crate::subnet_service::SyncCommitteeService;
@ -295,6 +296,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
} }
} }
let invalid_block_storage = config
.invalid_block_storage
.clone()
.map(InvalidBlockStorage::Enabled)
.unwrap_or(InvalidBlockStorage::Disabled);
// launch derived network services // launch derived network services
// router task // router task
@ -303,6 +310,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_globals.clone(), network_globals.clone(),
network_senders.network_send(), network_senders.network_send(),
executor.clone(), executor.clone(),
invalid_block_storage,
network_log.clone(), network_log.clone(),
)?; )?;

View File

@ -1093,4 +1093,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
// always using the builder. // always using the builder.
.conflicts_with("builder-profit-threshold") .conflicts_with("builder-profit-threshold")
) )
.arg(
Arg::with_name("invalid-gossip-verified-blocks-path")
.long("invalid-gossip-verified-blocks-path")
.value_name("PATH")
.help("If a block succeeds gossip validation whilst failing full validation, store \
the block SSZ as a file at this path. This feature is only recommended for \
developers. This directory is not pruned, users should be careful to avoid \
filling up their disks.")
)
} }

View File

@ -793,6 +793,11 @@ pub fn get_config<E: EthSpec>(
client_config.chain.enable_backfill_rate_limiting = client_config.chain.enable_backfill_rate_limiting =
!cli_args.is_present("disable-backfill-rate-limiting"); !cli_args.is_present("disable-backfill-rate-limiting");
if let Some(path) = clap_utils::parse_optional(cli_args, "invalid-gossip-verified-blocks-path")?
{
client_config.network.invalid_block_storage = Some(path);
}
Ok(client_config) Ok(client_config)
} }

View File

@ -2199,3 +2199,24 @@ fn disable_optimistic_finalized_sync() {
assert!(!config.chain.optimistic_finalized_sync); assert!(!config.chain.optimistic_finalized_sync);
}); });
} }
#[test]
fn invalid_gossip_verified_blocks_path_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.network.invalid_block_storage, None));
}
#[test]
fn invalid_gossip_verified_blocks_path() {
let path = "/home/karlm/naughty-blocks";
CommandLineTest::new()
.flag("invalid-gossip-verified-blocks-path", Some(path))
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.network.invalid_block_storage,
Some(PathBuf::from(path))
)
});
}