From 65a5eb829264cb279ed66814c961991ae3a0a04b Mon Sep 17 00:00:00 2001 From: ethDreamer Date: Sun, 19 Mar 2023 23:15:59 +0000 Subject: [PATCH 01/11] Reconstruct Payloads using Payload Bodies Methods (#4028) ## Issue Addressed * #3895 Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Co-authored-by: Michael Sproul --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + .../beacon_chain/src/beacon_block_streamer.rs | 973 ++++++++++++++++++ beacon_node/beacon_chain/src/beacon_chain.rs | 46 +- beacon_node/beacon_chain/src/errors.rs | 3 + beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/execution_layer/src/engine_api.rs | 80 +- .../execution_layer/src/engine_api/http.rs | 56 + .../src/engine_api/json_structures.rs | 29 +- beacon_node/execution_layer/src/lib.rs | 31 + beacon_node/execution_layer/src/metrics.rs | 4 + .../test_utils/execution_block_generator.rs | 8 + .../src/test_utils/handle_rpc.rs | 57 +- .../execution_layer/src/test_utils/mod.rs | 2 + .../beacon_processor/worker/rpc_methods.rs | 33 +- consensus/types/src/execution_payload.rs | 9 + .../src/nethermind.rs | 6 +- .../src/test_rig.rs | 28 +- 18 files changed, 1335 insertions(+), 33 deletions(-) create mode 100644 beacon_node/beacon_chain/src/beacon_block_streamer.rs diff --git a/Cargo.lock b/Cargo.lock index 04c2997c5..20642f32b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -610,6 +610,7 @@ dependencies = [ "task_executor", "tempfile", "tokio", + "tokio-stream", "tree_hash", "types", "unused_port", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 5599e6f97..9626aaae1 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -38,6 +38,7 @@ state_processing = { path = "../../consensus/state_processing" } tree_hash = "0.4.1" types = { path = "../../consensus/types" } tokio = "1.14.0" +tokio-stream = "0.1.3" eth1 = { path = "../eth1" } futures = "0.3.7" genesis = { path = "../genesis" } diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs new file mode 100644 index 000000000..e43f2a8dd --- /dev/null +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -0,0 +1,973 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; +use slog::{crit, debug, Logger}; +use std::collections::HashMap; +use std::sync::Arc; +use store::DatabaseBlock; +use task_executor::TaskExecutor; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream}; +use types::{ + ChainSpec, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, Hash256, SignedBeaconBlock, + SignedBlindedBeaconBlock, Slot, +}; +use types::{ + ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadHeader, ExecutionPayloadMerge, +}; + +#[derive(PartialEq)] +pub enum CheckEarlyAttesterCache { + Yes, + No, +} + +#[derive(Debug)] +pub enum Error { + PayloadReconstruction(String), + BlocksByRangeFailure(Box), + RequestNotFound, + BlockResultNotFound, +} + +const BLOCKS_PER_RANGE_REQUEST: u64 = 32; + +// This is the same as a DatabaseBlock but the Arc allows us to avoid an unnecessary clone. +enum LoadedBeaconBlock { + Full(Arc>), + Blinded(Box>), +} +type LoadResult = Result>, BeaconChainError>; +type BlockResult = Result>>, BeaconChainError>; + +enum RequestState { + UnSent(Vec>), + Sent(HashMap>>), +} + +struct BodiesByRange { + start: u64, + count: u64, + state: RequestState, +} + +// stores the components of a block for future re-construction in a small form +struct BlockParts { + blinded_block: Box>, + header: Box>, + body: Option>>, +} + +impl BlockParts { + pub fn new( + blinded: Box>, + header: ExecutionPayloadHeader, + ) -> Self { + Self { + blinded_block: blinded, + header: Box::new(header), + body: None, + } + } + + pub fn root(&self) -> Hash256 { + self.blinded_block.canonical_root() + } + + pub fn slot(&self) -> Slot { + self.blinded_block.message().slot() + } + + pub fn block_hash(&self) -> ExecutionBlockHash { + self.header.block_hash() + } +} + +fn reconstruct_default_header_block( + blinded_block: Box>, + header_from_block: ExecutionPayloadHeader, + spec: &ChainSpec, +) -> BlockResult { + let fork = blinded_block + .fork_name(spec) + .map_err(BeaconChainError::InconsistentFork)?; + + let payload: ExecutionPayload = match fork { + ForkName::Merge => ExecutionPayloadMerge::default().into(), + ForkName::Capella => ExecutionPayloadCapella::default().into(), + ForkName::Base | ForkName::Altair => { + return Err(Error::PayloadReconstruction(format!( + "Block with fork variant {} has execution payload", + fork + )) + .into()) + } + }; + + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == header_from_block { + blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some) + } else { + Err(BeaconChainError::InconsistentPayloadReconstructed { + slot: blinded_block.slot(), + exec_block_hash: header_from_block.block_hash(), + canonical_transactions_root: header_from_block.transactions_root(), + reconstructed_transactions_root: header_from_payload.transactions_root(), + }) + } +} + +fn reconstruct_blocks( + block_map: &mut HashMap>>, + block_parts_with_bodies: HashMap>, + log: &Logger, +) { + for (root, block_parts) in block_parts_with_bodies { + if let Some(payload_body) = block_parts.body { + match payload_body.to_payload(block_parts.header.as_ref().clone()) { + Ok(payload) => { + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == *block_parts.header { + block_map.insert( + root, + Arc::new( + block_parts + .blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some), + ), + ); + } else { + let error = BeaconChainError::InconsistentPayloadReconstructed { + slot: block_parts.blinded_block.slot(), + exec_block_hash: block_parts.header.block_hash(), + canonical_transactions_root: block_parts.header.transactions_root(), + reconstructed_transactions_root: header_from_payload + .transactions_root(), + }; + debug!(log, "Failed to reconstruct block"; "root" => ?root, "error" => ?error); + block_map.insert(root, Arc::new(Err(error))); + } + } + Err(string) => { + block_map.insert( + root, + Arc::new(Err(Error::PayloadReconstruction(string).into())), + ); + } + } + } else { + block_map.insert( + root, + Arc::new(Err(BeaconChainError::BlockHashMissingFromExecutionLayer( + block_parts.block_hash(), + ))), + ); + } + } +} + +impl BodiesByRange { + pub fn new(maybe_block_parts: Option>) -> Self { + if let Some(block_parts) = maybe_block_parts { + Self { + start: block_parts.header.block_number(), + count: 1, + state: RequestState::UnSent(vec![block_parts]), + } + } else { + Self { + start: 0, + count: 0, + state: RequestState::UnSent(vec![]), + } + } + } + + pub fn is_unsent(&self) -> bool { + matches!(self.state, RequestState::UnSent(_)) + } + + pub fn push_block_parts(&mut self, block_parts: BlockParts) -> Result<(), BlockParts> { + if self.count == BLOCKS_PER_RANGE_REQUEST { + return Err(block_parts); + } + + match &mut self.state { + RequestState::Sent(_) => Err(block_parts), + RequestState::UnSent(blocks_parts_vec) => { + let block_number = block_parts.header.block_number(); + if self.count == 0 { + self.start = block_number; + self.count = 1; + blocks_parts_vec.push(block_parts); + Ok(()) + } else { + // need to figure out if this block fits in the request + if block_number < self.start + || self.start + BLOCKS_PER_RANGE_REQUEST <= block_number + { + return Err(block_parts); + } + + blocks_parts_vec.push(block_parts); + if self.start + self.count <= block_number { + self.count = block_number - self.start + 1; + } + + Ok(()) + } + } + } + } + + async fn execute(&mut self, execution_layer: &ExecutionLayer, log: &Logger) { + if let RequestState::UnSent(blocks_parts_ref) = &mut self.state { + let block_parts_vec = std::mem::take(blocks_parts_ref); + + let mut block_map = HashMap::new(); + match execution_layer + .get_payload_bodies_by_range(self.start, self.count) + .await + { + Ok(bodies) => { + let mut range_map = (self.start..(self.start + self.count)) + .zip(bodies.into_iter().chain(std::iter::repeat(None))) + .collect::>(); + + let mut with_bodies = HashMap::new(); + for mut block_parts in block_parts_vec { + with_bodies + // it's possible the same block is requested twice, using + // or_insert_with() skips duplicates + .entry(block_parts.root()) + .or_insert_with(|| { + let block_number = block_parts.header.block_number(); + block_parts.body = + range_map.remove(&block_number).flatten().map(Box::new); + + block_parts + }); + } + + reconstruct_blocks(&mut block_map, with_bodies, log); + } + Err(e) => { + let block_result = + Arc::new(Err(Error::BlocksByRangeFailure(Box::new(e)).into())); + debug!(log, "Payload bodies by range failure"; "error" => ?block_result); + for block_parts in block_parts_vec { + block_map.insert(block_parts.root(), block_result.clone()); + } + } + } + self.state = RequestState::Sent(block_map); + } + } + + pub async fn get_block_result( + &mut self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Option>> { + self.execute(execution_layer, log).await; + if let RequestState::Sent(map) = &self.state { + return map.get(root).cloned(); + } + // Shouldn't reach this point + None + } +} + +#[derive(Clone)] +enum EngineRequest { + ByRange(Arc>>), + // When we already have the data or there's an error + NoRequest(Arc>>>>), +} + +impl EngineRequest { + pub fn new_by_range() -> Self { + Self::ByRange(Arc::new(RwLock::new(BodiesByRange::new(None)))) + } + pub fn new_no_request() -> Self { + Self::NoRequest(Arc::new(RwLock::new(HashMap::new()))) + } + + pub async fn is_unsent(&self) -> bool { + match self { + Self::ByRange(bodies_by_range) => bodies_by_range.read().await.is_unsent(), + Self::NoRequest(_) => false, + } + } + + pub async fn push_block_parts(&mut self, block_parts: BlockParts, log: &Logger) { + match self { + Self::ByRange(bodies_by_range) => { + let mut request = bodies_by_range.write().await; + + if let Err(block_parts) = request.push_block_parts(block_parts) { + drop(request); + let new_by_range = BodiesByRange::new(Some(block_parts)); + *self = Self::ByRange(Arc::new(RwLock::new(new_by_range))); + } + } + Self::NoRequest(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_parts called on NoRequest Variant", + ); + } + } + } + + pub async fn push_block_result( + &mut self, + root: Hash256, + block_result: BlockResult, + log: &Logger, + ) { + // this function will only fail if something is seriously wrong + match self { + Self::ByRange(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_result called on ByRange", + ); + } + Self::NoRequest(results) => { + results.write().await.insert(root, Arc::new(block_result)); + } + } + } + + pub async fn get_block_result( + &self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Arc> { + match self { + Self::ByRange(by_range) => { + by_range + .write() + .await + .get_block_result(root, execution_layer, log) + .await + } + Self::NoRequest(map) => map.read().await.get(root).cloned(), + } + .unwrap_or_else(|| { + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "block_result not found in request", + "root" => ?root, + ); + Arc::new(Err(Error::BlockResultNotFound.into())) + }) + } +} + +pub struct BeaconBlockStreamer { + execution_layer: ExecutionLayer, + check_early_attester_cache: CheckEarlyAttesterCache, + beacon_chain: Arc>, +} + +impl BeaconBlockStreamer { + pub fn new( + beacon_chain: &Arc>, + check_early_attester_cache: CheckEarlyAttesterCache, + ) -> Result { + let execution_layer = beacon_chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing)? + .clone(); + + Ok(Self { + execution_layer, + check_early_attester_cache, + beacon_chain: beacon_chain.clone(), + }) + } + + fn check_early_attester_cache( + &self, + root: Hash256, + ) -> Option>> { + if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes { + self.beacon_chain.early_attester_cache.get_block(root) + } else { + None + } + } + + fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { + let mut db_blocks = Vec::new(); + + for root in block_roots { + if let Some(cached_block) = self + .check_early_attester_cache(root) + .map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; + } + + match self.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + + db_blocks + } + + /// Pre-process the loaded blocks into execution engine requests. + /// + /// The purpose of this function is to separate the blocks into 2 categories: + /// 1) no_request - when we already have the full block or there's an error + /// 2) blocks_by_range - used for blinded blocks + /// + /// The function returns a vector of block roots in the same order as requested + /// along with the engine request that each root corresponds to. + async fn get_requests( + &self, + payloads: Vec<(Hash256, LoadResult)>, + ) -> Vec<(Hash256, EngineRequest)> { + let mut ordered_block_roots = Vec::new(); + let mut requests = HashMap::new(); + + // we sort the by range blocks by slot before adding them to the + // request as it should *better* optimize the number of blocks that + // can fit in the same request + let mut by_range_blocks: Vec> = vec![]; + let mut no_request = EngineRequest::new_no_request(); + + for (root, load_result) in payloads { + // preserve the order of the requested blocks + ordered_block_roots.push(root); + + let block_result = match load_result { + Err(e) => Err(e), + Ok(None) => Ok(None), + Ok(Some(LoadedBeaconBlock::Full(full_block))) => Ok(Some(full_block)), + Ok(Some(LoadedBeaconBlock::Blinded(blinded_block))) => { + match blinded_block + .message() + .execution_payload() + .map(|payload| payload.to_execution_payload_header()) + { + Ok(header) => { + if header.block_hash() == ExecutionBlockHash::zero() { + reconstruct_default_header_block( + blinded_block, + header, + &self.beacon_chain.spec, + ) + } else { + // Add the block to the set requiring a by-range request. + let block_parts = BlockParts::new(blinded_block, header); + by_range_blocks.push(block_parts); + continue; + } + } + Err(e) => Err(BeaconChainError::BeaconStateError(e)), + } + } + }; + + no_request + .push_block_result(root, block_result, &self.beacon_chain.log) + .await; + requests.insert(root, no_request.clone()); + } + + // Now deal with the by_range requests. Sort them in order of increasing slot + let mut by_range = EngineRequest::::new_by_range(); + by_range_blocks.sort_by_key(|block_parts| block_parts.slot()); + for block_parts in by_range_blocks { + let root = block_parts.root(); + by_range + .push_block_parts(block_parts, &self.beacon_chain.log) + .await; + requests.insert(root, by_range.clone()); + } + + let mut result = vec![]; + for root in ordered_block_roots { + if let Some(request) = requests.get(&root) { + result.push((root, request.clone())) + } else { + crit!( + self.beacon_chain.log, + "Please notify the devs"; + "beacon_block_streamer" => "request not found", + "root" => ?root, + ); + no_request + .push_block_result( + root, + Err(Error::RequestNotFound.into()), + &self.beacon_chain.log, + ) + .await; + result.push((root, no_request.clone())); + } + } + + result + } + + // used when the execution engine doesn't support the payload bodies methods + async fn stream_blocks_fallback( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + debug!( + self.beacon_chain.log, + "Using slower fallback method of eth_getBlockByHash()" + ); + for root in block_roots { + let cached_block = self.check_early_attester_cache(root); + let block_result = if cached_block.is_some() { + Ok(cached_block) + } else { + self.beacon_chain + .get_block(&root) + .await + .map(|opt_block| opt_block.map(Arc::new)) + }; + + if sender.send((root, Arc::new(block_result))).is_err() { + break; + } + } + } + + async fn stream_blocks( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + let n_roots = block_roots.len(); + let mut n_success = 0usize; + let mut n_sent = 0usize; + let mut engine_requests = 0usize; + + let payloads = self.load_payloads(block_roots); + let requests = self.get_requests(payloads).await; + + for (root, request) in requests { + if request.is_unsent().await { + engine_requests += 1; + } + + let result = request + .get_block_result(&root, &self.execution_layer, &self.beacon_chain.log) + .await; + + let successful = result + .as_ref() + .as_ref() + .map(|opt| opt.is_some()) + .unwrap_or(false); + + if sender.send((root, result)).is_err() { + break; + } else { + n_sent += 1; + if successful { + n_success += 1; + } + } + } + + debug!( + self.beacon_chain.log, + "BeaconBlockStreamer finished"; + "requested blocks" => n_roots, + "sent" => n_sent, + "succeeded" => n_success, + "failed" => (n_sent - n_success), + "engine requests" => engine_requests, + ); + } + + pub async fn stream( + self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + match self + .execution_layer + .get_engine_capabilities(None) + .await + .map_err(Box::new) + .map_err(BeaconChainError::EngineGetCapabilititesFailed) + { + Ok(engine_capabilities) => { + if engine_capabilities.get_payload_bodies_by_range_v1 { + self.stream_blocks(block_roots, sender).await; + } else { + // use the fallback method + self.stream_blocks_fallback(block_roots, sender).await; + } + } + Err(e) => { + send_errors(block_roots, sender, e).await; + } + } + } + + pub fn launch_stream( + self, + block_roots: Vec, + executor: &TaskExecutor, + ) -> impl Stream>)> { + let (block_tx, block_rx) = mpsc::unbounded_channel(); + debug!( + self.beacon_chain.log, + "Launching a BeaconBlockStreamer"; + "blocks" => block_roots.len(), + ); + executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); + UnboundedReceiverStream::new(block_rx) + } +} + +async fn send_errors( + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for root in block_roots { + if sender.send((root, result.clone())).is_err() { + break; + } + } +} + +impl From for BeaconChainError { + fn from(value: Error) -> Self { + BeaconChainError::BlockStreamerError(value) + } +} + +#[cfg(test)] +mod tests { + use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; + use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; + use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; + use execution_layer::EngineCapabilities; + use lazy_static::lazy_static; + use std::time::Duration; + use tokio::sync::mpsc; + use types::{ChainSpec, Epoch, EthSpec, Hash256, Keypair, MinimalEthSpec, Slot}; + + const VALIDATOR_COUNT: usize = 48; + lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); + } + + fn get_harness( + validator_count: usize, + spec: ChainSpec, + ) -> BeaconChainHarness> { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .logger(logging::test_logger()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness + } + + #[tokio::test] + async fn check_all_blocks_from_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } + + #[tokio::test] + async fn check_fallback_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + + // modify execution engine so it doesn't support engine_payloadBodiesBy* methods + let mock_execution_layer = harness.mock_execution_layer.as_ref().unwrap(); + mock_execution_layer + .server + .set_engine_capabilities(EngineCapabilities { + get_payload_bodies_by_hash_v1: false, + get_payload_bodies_by_range_v1: false, + ..DEFAULT_ENGINE_CAPABILITIES + }); + // refresh capabilities cache + harness + .chain + .execution_layer + .as_ref() + .unwrap() + .get_engine_capabilities(Some(Duration::ZERO)) + .await + .unwrap(); + + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 97ce142dd..8dee55569 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,6 +4,7 @@ use crate::attestation_verification::{ VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; +use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_times_cache::BlockTimesCache; @@ -102,6 +103,7 @@ use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; +use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::consts::merge::INTERVALS_PER_SLOT; @@ -941,14 +943,42 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub async fn get_block_checking_early_attester_cache( - &self, - block_root: &Hash256, - ) -> Result>>, Error> { - if let Some(block) = self.early_attester_cache.get_block(*block_root) { - return Ok(Some(block)); - } - Ok(self.get_block(block_root).await?.map(Arc::new)) + pub fn get_blocks_checking_early_attester_cache( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::Yes)? + .launch_stream(block_roots, executor), + ) + } + + pub fn get_blocks( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::No)? + .launch_stream(block_roots, executor), + ) } /// Returns the block at the given root, if any. diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 456097834..e789b54a2 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,4 +1,5 @@ use crate::attester_cache::Error as AttesterCacheError; +use crate::beacon_block_streamer::Error as BlockStreamerError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; use crate::eth1_chain::Error as Eth1ChainError; @@ -143,6 +144,7 @@ pub enum BeaconChainError { ExecutionLayerMissing, BlockVariantLacksExecutionPayload(Hash256), ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box), + EngineGetCapabilititesFailed(Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, @@ -150,6 +152,7 @@ pub enum BeaconChainError { canonical_transactions_root: Hash256, reconstructed_transactions_root: Hash256, }, + BlockStreamerError(BlockStreamerError), AddPayloadLogicError, ExecutionForkChoiceUpdateFailed(execution_layer::Error), PrepareProposerFailed(BlockProcessingError), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 173ce13b4..1cf1f4746 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,6 +2,7 @@ pub mod attestation_rewards; pub mod attestation_verification; mod attester_cache; pub mod beacon_block_reward; +mod beacon_block_streamer; mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 38311b823..3ecb36d09 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,7 +1,8 @@ use crate::engines::ForkchoiceState; use crate::http::{ ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ENGINE_FORKCHOICE_UPDATED_V1, - ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, }; use eth2::types::{SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2}; @@ -16,7 +17,8 @@ use strum::IntoStaticStr; use superstruct::superstruct; pub use types::{ Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, - ExecutionPayloadRef, FixedVector, ForkName, Hash256, Uint256, VariableList, Withdrawal, + ExecutionPayloadRef, FixedVector, ForkName, Hash256, Transactions, Uint256, VariableList, + Withdrawal, Withdrawals, }; use types::{ExecutionPayloadCapella, ExecutionPayloadMerge}; @@ -371,12 +373,80 @@ impl GetPayloadResponse { } } +#[derive(Clone, Debug)] +pub struct ExecutionPayloadBodyV1 { + pub transactions: Transactions, + pub withdrawals: Option>, +} + +impl ExecutionPayloadBodyV1 { + pub fn to_payload( + self, + header: ExecutionPayloadHeader, + ) -> Result, String> { + match header { + ExecutionPayloadHeader::Merge(header) => { + if self.withdrawals.is_some() { + return Err(format!( + "block {} is merge but payload body has withdrawals", + header.block_hash + )); + } + Ok(ExecutionPayload::Merge(ExecutionPayloadMerge { + parent_hash: header.parent_hash, + fee_recipient: header.fee_recipient, + state_root: header.state_root, + receipts_root: header.receipts_root, + logs_bloom: header.logs_bloom, + prev_randao: header.prev_randao, + block_number: header.block_number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: header.extra_data, + base_fee_per_gas: header.base_fee_per_gas, + block_hash: header.block_hash, + transactions: self.transactions, + })) + } + ExecutionPayloadHeader::Capella(header) => { + if let Some(withdrawals) = self.withdrawals { + Ok(ExecutionPayload::Capella(ExecutionPayloadCapella { + parent_hash: header.parent_hash, + fee_recipient: header.fee_recipient, + state_root: header.state_root, + receipts_root: header.receipts_root, + logs_bloom: header.logs_bloom, + prev_randao: header.prev_randao, + block_number: header.block_number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: header.extra_data, + base_fee_per_gas: header.base_fee_per_gas, + block_hash: header.block_hash, + transactions: self.transactions, + withdrawals, + })) + } else { + Err(format!( + "block {} is capella but payload body doesn't have withdrawals", + header.block_hash + )) + } + } + } + } +} + #[derive(Clone, Copy, Debug)] pub struct EngineCapabilities { pub new_payload_v1: bool, pub new_payload_v2: bool, pub forkchoice_updated_v1: bool, pub forkchoice_updated_v2: bool, + pub get_payload_bodies_by_hash_v1: bool, + pub get_payload_bodies_by_range_v1: bool, pub get_payload_v1: bool, pub get_payload_v2: bool, pub exchange_transition_configuration_v1: bool, @@ -397,6 +467,12 @@ impl EngineCapabilities { if self.forkchoice_updated_v2 { response.push(ENGINE_FORKCHOICE_UPDATED_V2); } + if self.get_payload_bodies_by_hash_v1 { + response.push(ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1); + } + if self.get_payload_bodies_by_range_v1 { + response.push(ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1); + } if self.get_payload_v1 { response.push(ENGINE_GET_PAYLOAD_V1); } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 8492dbc4c..993957450 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -42,6 +42,10 @@ pub const ENGINE_FORKCHOICE_UPDATED_V1: &str = "engine_forkchoiceUpdatedV1"; pub const ENGINE_FORKCHOICE_UPDATED_V2: &str = "engine_forkchoiceUpdatedV2"; pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8); +pub const ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1: &str = "engine_getPayloadBodiesByHashV1"; +pub const ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1: &str = "engine_getPayloadBodiesByRangeV1"; +pub const ENGINE_GET_PAYLOAD_BODIES_TIMEOUT: Duration = Duration::from_secs(10); + pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = "engine_exchangeTransitionConfigurationV1"; pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_secs(1); @@ -62,6 +66,8 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ ENGINE_GET_PAYLOAD_V2, ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, + ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ]; @@ -73,6 +79,8 @@ pub static PRE_CAPELLA_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilit new_payload_v2: false, forkchoice_updated_v1: true, forkchoice_updated_v2: false, + get_payload_bodies_by_hash_v1: false, + get_payload_bodies_by_range_v1: false, get_payload_v1: true, get_payload_v2: false, exchange_transition_configuration_v1: true, @@ -882,6 +890,50 @@ impl HttpJsonRpc { Ok(response.into()) } + pub async fn get_payload_bodies_by_hash_v1( + &self, + block_hashes: Vec, + ) -> Result>>, Error> { + let params = json!([block_hashes]); + + let response: Vec>> = self + .rpc_request( + ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + params, + ENGINE_GET_PAYLOAD_BODIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await?; + + Ok(response + .into_iter() + .map(|opt_json| opt_json.map(From::from)) + .collect()) + } + + pub async fn get_payload_bodies_by_range_v1( + &self, + start: u64, + count: u64, + ) -> Result>>, Error> { + #[derive(Serialize)] + #[serde(transparent)] + struct Quantity(#[serde(with = "eth2_serde_utils::u64_hex_be")] u64); + + let params = json!([Quantity(start), Quantity(count)]); + let response: Vec>> = self + .rpc_request( + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, + params, + ENGINE_GET_PAYLOAD_BODIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await?; + + Ok(response + .into_iter() + .map(|opt_json| opt_json.map(From::from)) + .collect()) + } + pub async fn exchange_transition_configuration_v1( &self, transition_configuration: TransitionConfigurationV1, @@ -924,6 +976,10 @@ impl HttpJsonRpc { new_payload_v2: capabilities.contains(ENGINE_NEW_PAYLOAD_V2), forkchoice_updated_v1: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V1), forkchoice_updated_v2: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V2), + get_payload_bodies_by_hash_v1: capabilities + .contains(ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1), + get_payload_bodies_by_range_v1: capabilities + .contains(ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1), get_payload_v1: capabilities.contains(ENGINE_GET_PAYLOAD_V1), get_payload_v2: capabilities.contains(ENGINE_GET_PAYLOAD_V2), exchange_transition_configuration_v1: capabilities diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index dcfa63545..6d33bbabe 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use strum::EnumString; use superstruct::superstruct; use types::{ - EthSpec, ExecutionBlockHash, FixedVector, Transaction, Unsigned, VariableList, Withdrawal, + EthSpec, ExecutionBlockHash, FixedVector, Transactions, Unsigned, VariableList, Withdrawal, }; use types::{ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadMerge}; @@ -93,8 +93,7 @@ pub struct JsonExecutionPayload { pub base_fee_per_gas: Uint256, pub block_hash: ExecutionBlockHash, #[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")] - pub transactions: - VariableList, T::MaxTransactionsPerPayload>, + pub transactions: Transactions, #[superstruct(only(V2))] pub withdrawals: VariableList, } @@ -494,6 +493,30 @@ impl From for JsonForkchoiceUpdatedV1Response { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(bound = "E: EthSpec")] +pub struct JsonExecutionPayloadBodyV1 { + #[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")] + pub transactions: Transactions, + pub withdrawals: Option>, +} + +impl From> for ExecutionPayloadBodyV1 { + fn from(value: JsonExecutionPayloadBodyV1) -> Self { + Self { + transactions: value.transactions, + withdrawals: value.withdrawals.map(|json_withdrawals| { + Withdrawals::::from( + json_withdrawals + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TransitionConfigurationV1 { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index fa661fcf6..2c2d8c7dc 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1571,6 +1571,37 @@ impl ExecutionLayer { } } + pub async fn get_payload_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result>>, Error> { + self.engine() + .request(|engine: &Engine| async move { + engine.api.get_payload_bodies_by_hash_v1(hashes).await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + + pub async fn get_payload_bodies_by_range( + &self, + start: u64, + count: u64, + ) -> Result>>, Error> { + let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE); + self.engine() + .request(|engine: &Engine| async move { + engine + .api + .get_payload_bodies_by_range_v1(start, count) + .await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + pub async fn get_payload_by_block_hash( &self, hash: ExecutionBlockHash, diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index 287050f66..3ed99ca60 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -45,6 +45,10 @@ lazy_static::lazy_static! { "execution_layer_get_payload_by_block_hash_time", "Time to reconstruct a payload from the EE using eth_getBlockByHash" ); + pub static ref EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE: Result = try_create_histogram( + "execution_layer_get_payload_bodies_by_range_time", + "Time to fetch a range of payload bodies from the EE" + ); pub static ref EXECUTION_LAYER_VERIFY_BLOCK_HASH: Result = try_create_histogram_with_buckets( "execution_layer_verify_block_hash_time", "Time to verify the execution block hash in Lighthouse, without the EL", diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index c016a16a2..a8d98a767 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -199,6 +199,14 @@ impl ExecutionBlockGenerator { .and_then(|block| block.as_execution_block_with_tx()) } + pub fn execution_block_with_txs_by_number( + &self, + number: u64, + ) -> Option> { + self.block_by_number(number) + .and_then(|block| block.as_execution_block_with_tx()) + } + pub fn move_to_block_prior_to_terminal_block(&mut self) -> Result<(), String> { let target_block = self .terminal_block_number diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 2a54dfae6..bda0c782d 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -2,7 +2,7 @@ use super::Context; use crate::engine_api::{http::*, *}; use crate::json_structures::*; use crate::test_utils::DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize}; use serde_json::Value as JsonValue; use std::sync::Arc; use types::{EthSpec, ForkName}; @@ -359,6 +359,61 @@ pub async fn handle_rpc( let engine_capabilities = ctx.engine_capabilities.read(); Ok(serde_json::to_value(engine_capabilities.to_response()).unwrap()) } + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1 => { + #[derive(Deserialize)] + #[serde(transparent)] + struct Quantity(#[serde(with = "eth2_serde_utils::u64_hex_be")] pub u64); + + let start = get_param::(params, 0) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? + .0; + let count = get_param::(params, 1) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? + .0; + + let mut response = vec![]; + for block_num in start..(start + count) { + let maybe_block = ctx + .execution_block_generator + .read() + .execution_block_with_txs_by_number(block_num); + + match maybe_block { + Some(block) => { + let transactions = Transactions::::new( + block + .transactions() + .iter() + .map(|transaction| VariableList::new(transaction.rlp().to_vec())) + .collect::>() + .map_err(|e| { + ( + format!("failed to deserialize transaction: {:?}", e), + GENERIC_ERROR_CODE, + ) + })?, + ) + .map_err(|e| { + ( + format!("failed to deserialize transactions: {:?}", e), + GENERIC_ERROR_CODE, + ) + })?; + + response.push(Some(JsonExecutionPayloadBodyV1:: { + transactions, + withdrawals: block + .withdrawals() + .ok() + .map(|withdrawals| VariableList::from(withdrawals.clone())), + })); + } + None => response.push(None), + } + } + + Ok(serde_json::to_value(response).unwrap()) + } other => Err(( format!("The method {} does not exist/is not available", other), METHOD_NOT_FOUND_CODE, diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 36b24bfc3..9379a3c23 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -39,6 +39,8 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { new_payload_v2: true, forkchoice_updated_v1: true, forkchoice_updated_v2: true, + get_payload_bodies_by_hash_v1: true, + get_payload_bodies_by_range_v1: true, get_payload_v1: true, get_payload_v2: true, exchange_transition_configuration_v1: true, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index afcc15280..81b163bf7 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -9,8 +9,8 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; -use std::sync::Arc; use task_executor::TaskExecutor; +use tokio_stream::StreamExt; use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -131,21 +131,25 @@ impl Worker { request_id: PeerRequestId, request: BlocksByRootRequest, ) { + let requested_blocks = request.block_roots.len(); + let mut block_stream = match self + .chain + .get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor) + { + Ok(block_stream) => block_stream, + Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + }; // Fetching blocks is async because it may have to hit the execution layer for payloads. executor.spawn( async move { let mut send_block_count = 0; let mut send_response = true; - for root in request.block_roots.iter() { - match self - .chain - .get_block_checking_early_attester_cache(root) - .await - { + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { Ok(Some(block)) => { self.send_response( peer_id, - Response::BlocksByRoot(Some(block)), + Response::BlocksByRoot(Some(block.clone())), request_id, ); send_block_count += 1; @@ -190,7 +194,7 @@ impl Worker { self.log, "Received BlocksByRoot Request"; "peer" => %peer_id, - "requested" => request.block_roots.len(), + "requested" => requested_blocks, "returned" => %send_block_count ); @@ -344,14 +348,19 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); + let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + Ok(block_stream) => block_stream, + Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + }; + // Fetching blocks is async because it may have to hit the execution layer for payloads. executor.spawn( async move { let mut blocks_sent = 0; let mut send_response = true; - for root in block_roots { - match self.chain.get_block(&root).await { + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending @@ -361,7 +370,7 @@ impl Worker { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlocksByRange(Some(Arc::new(block))), + response: Response::BlocksByRange(Some(block.clone())), id: request_id, }); } diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index c2b5295d6..18da0d161 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -152,3 +152,12 @@ impl ForkVersionDeserialize for ExecutionPayload { }) } } + +impl ExecutionPayload { + pub fn fork_name(&self) -> ForkName { + match self { + ExecutionPayload::Merge(_) => ForkName::Merge, + ExecutionPayload::Capella(_) => ForkName::Capella, + } + } +} diff --git a/testing/execution_engine_integration/src/nethermind.rs b/testing/execution_engine_integration/src/nethermind.rs index 720a4a73b..485485c6f 100644 --- a/testing/execution_engine_integration/src/nethermind.rs +++ b/testing/execution_engine_integration/src/nethermind.rs @@ -11,7 +11,7 @@ use unused_port::unused_tcp4_port; /// We've pinned the Nethermind version since our method of using the `master` branch to /// find the latest tag isn't working. It appears Nethermind don't always tag on `master`. /// We should fix this so we always pull the latest version of Nethermind. -const NETHERMIND_BRANCH: &str = "release/1.14.6"; +const NETHERMIND_BRANCH: &str = "release/1.17.1"; const NETHERMIND_REPO_URL: &str = "https://github.com/NethermindEth/nethermind"; fn build_result(repo_dir: &Path) -> Output { @@ -67,7 +67,7 @@ impl NethermindEngine { .join("Nethermind.Runner") .join("bin") .join("Release") - .join("net6.0") + .join("net7.0") .join("Nethermind.Runner") } } @@ -95,7 +95,7 @@ impl GenericExecutionEngine for NethermindEngine { .arg("--datadir") .arg(datadir.path().to_str().unwrap()) .arg("--config") - .arg("kiln") + .arg("hive") .arg("--Init.ChainSpecPath") .arg(genesis_json_path.to_str().unwrap()) .arg("--Merge.TerminalTotalDifficulty") diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 15e9f2601..ff333332b 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -15,8 +15,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::time::sleep; use types::{ - Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ForkName, FullPayload, - Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, + Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, + ForkName, FullPayload, Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, }; const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(30); @@ -628,12 +628,32 @@ async fn check_payload_reconstruction( ) { let reconstructed = ee .execution_layer - // FIXME: handle other forks here? - .get_payload_by_block_hash(payload.block_hash(), ForkName::Merge) + .get_payload_by_block_hash(payload.block_hash(), payload.fork_name()) .await .unwrap() .unwrap(); assert_eq!(reconstructed, *payload); + // also check via payload bodies method + let capabilities = ee + .execution_layer + .get_engine_capabilities(None) + .await + .unwrap(); + assert!( + // if the engine doesn't have these capabilities, we need to update the client in our tests + capabilities.get_payload_bodies_by_hash_v1 && capabilities.get_payload_bodies_by_range_v1, + "Testing engine does not support payload bodies methods" + ); + let mut bodies = ee + .execution_layer + .get_payload_bodies_by_hash(vec![payload.block_hash()]) + .await + .unwrap(); + assert_eq!(bodies.len(), 1); + let body = bodies.pop().unwrap().unwrap(); + let header = ExecutionPayloadHeader::from(payload.to_ref()); + let reconstructed_from_body = body.to_payload(header).unwrap(); + assert_eq!(reconstructed_from_body, *payload); } /// Returns the duration since the unix epoch. From 17d56b06f608aceb258575484111e59ab4bc8fac Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 20 Mar 2023 21:50:37 +0000 Subject: [PATCH 02/11] Ignore self as a bootnode (#4110) If a node is also a bootnode it can try to add itself to its own local routing table which will emit an error. The error is entirely harmless but we would prefer to avoid emitting the error. This PR does not attempt to add a boot node ENR if that ENR corresponds to our local peer-id/node-id. --- beacon_node/lighthouse_network/src/discovery/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index b9c4e76fe..f58362097 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -197,6 +197,7 @@ impl Discovery { }; let local_enr = network_globals.local_enr.read().clone(); + let local_node_id = local_enr.node_id(); info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp6() @@ -217,6 +218,10 @@ impl Discovery { // Add bootnodes to routing table for bootnode_enr in config.boot_nodes_enr.clone() { + if bootnode_enr.node_id() == local_node_id { + // If we are a boot node, ignore adding it to the routing table + continue; + } debug!( log, "Adding node to routing table"; From 76a2007b641b9f0081a5fdcfb784bfb137b9c258 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 21 Mar 2023 05:14:57 +0000 Subject: [PATCH 03/11] Improve Lighthouse Connectivity Via ENR TCP Update (#4057) Currently Lighthouse will remain uncontactable if users port forward a port that is not the same as the one they are listening on. For example, if Lighthouse runs with port 9000 TCP/UDP locally but a router is configured to pass 9010 externally to the lighthouse node on 9000, other nodes on the network will not be able to reach the lighthouse node. This occurs because Lighthouse does not update its ENR TCP port on external socket discovery. The intention was always that users should use `--enr-tcp-port` to customise this, but this is non-intuitive. The difficulty arises because we have no discovery mechanism to find our external TCP port. If we discovery a new external UDP port, we must guess what our external TCP port might be. This PR assumes the external TCP port is the same as the external UDP port (which may not be the case) and thus updates the TCP port along with the UDP port if the `--enr-tcp-port` flag is not set. Along with this PR, will be added documentation to the Lighthouse book so users can correctly understand and configure their ENR to maximize Lighthouse's connectivity. This relies on https://github.com/sigp/discv5/pull/166 and we should wait for a new release in discv5 before adding this PR. --- Cargo.lock | 45 +++++++++++-------- beacon_node/lighthouse_network/Cargo.toml | 2 +- .../lighthouse_network/src/discovery/mod.rs | 20 +++++++++ beacon_node/lighthouse_network/src/metrics.rs | 3 +- book/src/advanced_networking.md | 12 ++++- book/src/faq.md | 5 ++- boot_node/src/server.rs | 2 +- common/eth2_network_config/Cargo.toml | 2 +- common/eth2_network_config/src/lib.rs | 2 +- 9 files changed, 67 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20642f32b..021a14e0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1605,16 +1605,6 @@ version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b72465f46d518f6015d9cf07f7f3013a95dd6b9c2747c3d65ae0cce43929d14f" -[[package]] -name = "delay_map" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c4d75d3abfe4830dcbf9bcb1b926954e121669f74dd1ca7aa0183b1755d83f6" -dependencies = [ - "futures", - "tokio-util 0.6.10", -] - [[package]] name = "delay_map" version = "0.3.0" @@ -1816,15 +1806,15 @@ dependencies = [ [[package]] name = "discv5" -version = "0.1.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767c0e59b3e8d65222d95df723cc2ea1da92bb0f27c563607e6f0bde064f255" +checksum = "b009a99b85b58900df46435307fc5c4c845af7e182582b1fbf869572fa9fce69" dependencies = [ "aes 0.7.5", "aes-gcm 0.9.4", "arrayvec", - "delay_map 0.1.2", - "enr", + "delay_map", + "enr 0.7.0", "fnv", "futures", "hashlink 0.7.0", @@ -1973,6 +1963,25 @@ name = "enr" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26fa0a0be8915790626d5759eb51fe47435a8eac92c2f212bd2da9aa7f30ea56" +dependencies = [ + "base64 0.13.1", + "bs58", + "bytes", + "hex", + "k256", + "log", + "rand 0.8.5", + "rlp", + "serde", + "sha3 0.10.6", + "zeroize", +] + +[[package]] +name = "enr" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "492a7e5fc2504d5fdce8e124d3e263b244a68b283cac67a69eda0cd43e0aebad" dependencies = [ "base64 0.13.1", "bs58", @@ -2221,7 +2230,7 @@ dependencies = [ name = "eth2_network_config" version = "0.2.0" dependencies = [ - "enr", + "discv5", "eth2_config", "eth2_ssz", "serde_yaml", @@ -2372,7 +2381,7 @@ dependencies = [ "async-stream", "blst", "bs58", - "enr", + "enr 0.6.2", "hex", "integer-sqrt", "multiaddr 0.14.0", @@ -4415,7 +4424,7 @@ dependencies = [ name = "lighthouse_network" version = "0.2.0" dependencies = [ - "delay_map 0.3.0", + "delay_map", "directory", "dirs", "discv5", @@ -5032,7 +5041,7 @@ name = "network" version = "0.2.0" dependencies = [ "beacon_chain", - "delay_map 0.3.0", + "delay_map", "derivative", "environment", "error-chain", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 2ec8baaf5..dda797187 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Sigma Prime "] edition = "2021" [dependencies] -discv5 = { version = "0.1.0", features = ["libp2p"] } +discv5 = { version = "0.2.2", features = ["libp2p"] } unsigned-varint = { version = "0.6.0", features = ["codec"] } types = { path = "../../consensus/types" } eth2_ssz_types = "0.2.2" diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index f58362097..dda68aff9 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -177,6 +177,13 @@ pub struct Discovery { /// always false. pub started: bool, + /// This keeps track of whether an external UDP port change should also indicate an internal + /// TCP port change. As we cannot detect our external TCP port, we assume that the external UDP + /// port is also our external TCP port. This assumption only holds if the user has not + /// explicitly set their ENR TCP port via the CLI config. The first indicates tcp4 and the + /// second indicates tcp6. + update_tcp_port: (bool, bool), + /// Logger for the discovery behaviour. log: slog::Logger, } @@ -300,6 +307,11 @@ impl Discovery { } } + let update_tcp_port = ( + config.enr_tcp4_port.is_none(), + config.enr_tcp6_port.is_none(), + ); + Ok(Self { cached_enrs: LruCache::new(50), network_globals, @@ -309,6 +321,7 @@ impl Discovery { discv5, event_stream, started: !config.disable_discovery, + update_tcp_port, log, enr_dir, }) @@ -1019,6 +1032,13 @@ impl NetworkBehaviour for Discovery { metrics::check_nat(); // Discv5 will have updated our local ENR. We save the updated version // to disk. + + if (self.update_tcp_port.0 && socket_addr.is_ipv4()) + || (self.update_tcp_port.1 && socket_addr.is_ipv6()) + { + // Update the TCP port in the ENR + self.discv5.update_local_enr_socket(socket_addr, true); + } let enr = self.discv5.local_enr(); enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log); // update network globals diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index aed68e27f..58cc99201 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -167,7 +167,8 @@ pub fn check_nat() { } pub fn scrape_discovery_metrics() { - let metrics = discv5::metrics::Metrics::from(discv5::Discv5::raw_metrics()); + let metrics = + discv5::metrics::Metrics::from(discv5::Discv5::::raw_metrics()); set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second); set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64); set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64); diff --git a/book/src/advanced_networking.md b/book/src/advanced_networking.md index fb7f07a51..08d276ba3 100644 --- a/book/src/advanced_networking.md +++ b/book/src/advanced_networking.md @@ -41,7 +41,7 @@ drastically and use the (recommended) default. ### NAT Traversal (Port Forwarding) -Lighthouse, by default, used port 9000 for both TCP and UDP. Lighthouse will +Lighthouse, by default, uses port 9000 for both TCP and UDP. Lighthouse will still function if it is behind a NAT without any port mappings. Although Lighthouse still functions, we recommend that some mechanism is used to ensure that your Lighthouse node is publicly accessible. This will typically improve @@ -54,6 +54,16 @@ node will inform you of established routes in this case). If UPnP is not enabled, we recommend you manually set up port mappings to both of Lighthouse's TCP and UDP ports (9000 by default). +> Note: Lighthouse needs to advertise its publicly accessible ports in +> order to inform its peers that it is contactable and how to connect to it. +> Lighthouse has an automated way of doing this for the UDP port. This means +> Lighthouse can detect its external UDP port. There is no such mechanism for the +> TCP port. As such, we assume that the external UDP and external TCP port is the +> same (i.e external 5050 UDP/TCP mapping to internal 9000 is fine). If you are setting up differing external UDP and TCP ports, you should +> explicitly specify them using the `--enr-tcp-port` and `--enr-udp-port` as +> explained in the following section. + + ### ENR Configuration Lighthouse has a number of CLI parameters for constructing and modifying the diff --git a/book/src/faq.md b/book/src/faq.md index 5bfae3fa8..43de40eee 100644 --- a/book/src/faq.md +++ b/book/src/faq.md @@ -128,8 +128,9 @@ same `datadir` as a previous network. I.e if you have been running the `datadir` (the `datadir` is also printed out in the beacon node's logs on boot-up). -If you find yourself with a low peer count and is not reaching the target you -expect. Try setting up the correct port forwards as described [here](./advanced_networking.md#nat-traversal-port-forwarding). +If you find yourself with a low peer count and it's not reaching the target you +expect. Try setting up the correct port forwards as described +[here](./advanced_networking.md#nat-traversal-port-forwarding). ### What should I do if I lose my slashing protection database? diff --git a/boot_node/src/server.rs b/boot_node/src/server.rs index 8f38fb300..3f5419c2c 100644 --- a/boot_node/src/server.rs +++ b/boot_node/src/server.rs @@ -44,7 +44,7 @@ pub async fn run(config: BootNodeConfig, log: slog::Logger) { info!(log, "Contact information"; "multiaddrs" => ?local_enr.multiaddr_p2p()); // construct the discv5 server - let mut discv5 = Discv5::new(local_enr.clone(), local_key, discv5_config).unwrap(); + let mut discv5: Discv5 = Discv5::new(local_enr.clone(), local_key, discv5_config).unwrap(); // If there are any bootnodes add them to the routing table for enr in boot_nodes { diff --git a/common/eth2_network_config/Cargo.toml b/common/eth2_network_config/Cargo.toml index 619900555..95cea62d4 100644 --- a/common/eth2_network_config/Cargo.toml +++ b/common/eth2_network_config/Cargo.toml @@ -18,4 +18,4 @@ serde_yaml = "0.8.13" types = { path = "../../consensus/types"} eth2_ssz = "0.4.1" eth2_config = { path = "../eth2_config"} -enr = { version = "0.6.2", features = ["ed25519", "k256"] } +discv5 = "0.2.2" diff --git a/common/eth2_network_config/src/lib.rs b/common/eth2_network_config/src/lib.rs index 7aef78437..7274bbf02 100644 --- a/common/eth2_network_config/src/lib.rs +++ b/common/eth2_network_config/src/lib.rs @@ -11,7 +11,7 @@ //! To add a new built-in testnet, add it to the `define_hardcoded_nets` invocation in the `eth2_config` //! crate. -use enr::{CombinedKey, Enr}; +use discv5::enr::{CombinedKey, Enr}; use eth2_config::{instantiate_hardcoded_nets, HardcodedNet}; use std::fs::{create_dir_all, File}; use std::io::{Read, Write}; From 785a9171e630e48c4d2bd483fc034c9a0493673b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 21 Mar 2023 05:14:59 +0000 Subject: [PATCH 04/11] Customisable shuffling cache size (#4081) This PR enables the user to adjust the shuffling cache size. This is useful for some HTTP API requests which require re-computing old shufflings. This PR currently optimizes the beacon/states/{state_id}/committees HTTP API by first checking the cache before re-building shuffling. If the shuffling is set to a non-default value, then the HTTP API request will also fill the cache when as it constructs new shufflings. If the CLI flag is not present or the value is set to the default of 16 the default behaviour is observed. Co-authored-by: Michael Sproul --- beacon_node/beacon_chain/src/builder.rs | 3 +- beacon_node/beacon_chain/src/chain_config.rs | 3 + beacon_node/beacon_chain/src/lib.rs | 2 +- .../beacon_chain/src/shuffling_cache.rs | 16 +- beacon_node/http_api/src/lib.rs | 137 ++++++++++++++---- beacon_node/src/cli.rs | 8 + beacon_node/src/config.rs | 4 + lighthouse/tests/beacon_node.rs | 20 +++ 8 files changed, 151 insertions(+), 42 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 0bff5aa07..cd465d4e7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -765,6 +765,7 @@ where let genesis_time = head_snapshot.beacon_state.genesis_time(); let head_for_snapshot_cache = head_snapshot.clone(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); + let shuffling_cache_size = self.chain_config.shuffling_cache_size; let beacon_chain = BeaconChain { spec: self.spec, @@ -818,7 +819,7 @@ where DEFAULT_SNAPSHOT_CACHE_SIZE, head_for_snapshot_cache, )), - shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), + shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 6e3538aed..315861894 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -67,6 +67,8 @@ pub struct ChainConfig { pub prepare_payload_lookahead: Duration, /// Use EL-free optimistic sync for the finalized part of the chain. pub optimistic_finalized_sync: bool, + /// The size of the shuffling cache, + pub shuffling_cache_size: usize, /// Whether to send payload attributes every slot, regardless of connected proposers. /// /// This is useful for block builders and testing. @@ -97,6 +99,7 @@ impl Default for ChainConfig { prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. optimistic_finalized_sync: true, + shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE, always_prepare_payload: false, } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 1cf1f4746..c25e308c4 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -40,7 +40,7 @@ mod persisted_fork_choice; mod pre_finalization_cache; pub mod proposer_prep_service; pub mod schema_change; -mod shuffling_cache; +pub mod shuffling_cache; mod snapshot_cache; pub mod state_advance_timer; pub mod sync_committee_rewards; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index a01847a0e..91a1e24d8 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -9,7 +9,7 @@ use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256 /// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + /// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). -const CACHE_SIZE: usize = 16; +pub const DEFAULT_CACHE_SIZE: usize = 16; /// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this /// limits the number of concurrent states that can be loaded into memory for the committee cache. @@ -54,9 +54,9 @@ pub struct ShufflingCache { } impl ShufflingCache { - pub fn new() -> Self { + pub fn new(cache_size: usize) -> Self { Self { - cache: LruCache::new(CACHE_SIZE), + cache: LruCache::new(cache_size), } } @@ -172,7 +172,7 @@ impl ToArcCommitteeCache for Arc { impl Default for ShufflingCache { fn default() -> Self { - Self::new() + Self::new(DEFAULT_CACHE_SIZE) } } @@ -249,7 +249,7 @@ mod test { fn resolved_promise() { let (committee_a, _) = committee_caches(); let id_a = shuffling_id(1); - let mut cache = ShufflingCache::new(); + let mut cache = ShufflingCache::default(); // Create a promise. let sender = cache.create_promise(id_a.clone()).unwrap(); @@ -276,7 +276,7 @@ mod test { #[test] fn unresolved_promise() { let id_a = shuffling_id(1); - let mut cache = ShufflingCache::new(); + let mut cache = ShufflingCache::default(); // Create a promise. let sender = cache.create_promise(id_a.clone()).unwrap(); @@ -301,7 +301,7 @@ mod test { fn two_promises() { let (committee_a, committee_b) = committee_caches(); let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); - let mut cache = ShufflingCache::new(); + let mut cache = ShufflingCache::default(); // Create promise A. let sender_a = cache.create_promise(id_a.clone()).unwrap(); @@ -355,7 +355,7 @@ mod test { #[test] fn too_many_promises() { - let mut cache = ShufflingCache::new(); + let mut cache = ShufflingCache::default(); for i in 0..MAX_CONCURRENT_PROMISES { cache.create_promise(shuffling_id(i as u64)).unwrap(); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 067119d9f..d3d99c5c9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -54,8 +54,8 @@ use system_health::observe_system_health_bn; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ - Attestation, AttestationData, AttesterSlashing, BeaconStateError, BlindedPayload, - CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, + Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, + BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, @@ -784,39 +784,112 @@ pub fn serve( let current_epoch = state.current_epoch(); let epoch = query.epoch.unwrap_or(current_epoch); - let committee_cache = - match RelativeEpoch::from_epoch(current_epoch, epoch) { - Ok(relative_epoch) - if state - .committee_cache_is_initialized(relative_epoch) => - { - state.committee_cache(relative_epoch).map(Cow::Borrowed) - } - _ => CommitteeCache::initialized(state, epoch, &chain.spec) + // Attempt to obtain the committee_cache from the beacon chain + let decision_slot = (epoch.saturating_sub(2u64)) + .end_slot(T::EthSpec::slots_per_epoch()); + // Find the decision block and skip to another method on any kind + // of failure + let shuffling_id = if let Ok(Some(shuffling_decision_block)) = + chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) + { + Some(AttestationShufflingId { + shuffling_epoch: epoch, + shuffling_decision_block, + }) + } else { + None + }; + + // Attempt to read from the chain cache if there exists a + // shuffling_id + let maybe_cached_shuffling = if let Some(shuffling_id) = + shuffling_id.as_ref() + { + chain + .shuffling_cache + .try_write_for(std::time::Duration::from_secs(1)) + .and_then(|mut cache_write| cache_write.get(shuffling_id)) + .and_then(|cache_item| cache_item.wait().ok()) + } else { + None + }; + + let committee_cache = if let Some(ref shuffling) = + maybe_cached_shuffling + { + Cow::Borrowed(&**shuffling) + } else { + let possibly_built_cache = + match RelativeEpoch::from_epoch(current_epoch, epoch) { + Ok(relative_epoch) + if state.committee_cache_is_initialized( + relative_epoch, + ) => + { + state + .committee_cache(relative_epoch) + .map(Cow::Borrowed) + } + _ => CommitteeCache::initialized( + state, + epoch, + &chain.spec, + ) .map(Cow::Owned), - } - .map_err(|e| match e { - BeaconStateError::EpochOutOfBounds => { - let max_sprp = - T::EthSpec::slots_per_historical_root() as u64; - let first_subsequent_restore_point_slot = ((epoch - .start_slot(T::EthSpec::slots_per_epoch()) - / max_sprp) - + 1) - * max_sprp; - if epoch < current_epoch { - warp_utils::reject::custom_bad_request(format!( - "epoch out of bounds, try state at slot {}", - first_subsequent_restore_point_slot, - )) - } else { - warp_utils::reject::custom_bad_request( - "epoch out of bounds, too far in future".into(), - ) + } + .map_err(|e| { + match e { + BeaconStateError::EpochOutOfBounds => { + let max_sprp = + T::EthSpec::slots_per_historical_root() + as u64; + let first_subsequent_restore_point_slot = + ((epoch.start_slot( + T::EthSpec::slots_per_epoch(), + ) / max_sprp) + + 1) + * max_sprp; + if epoch < current_epoch { + warp_utils::reject::custom_bad_request( + format!( + "epoch out of bounds, \ + try state at slot {}", + first_subsequent_restore_point_slot, + ), + ) + } else { + warp_utils::reject::custom_bad_request( + "epoch out of bounds, \ + too far in future" + .into(), + ) + } + } + _ => { + warp_utils::reject::beacon_chain_error(e.into()) + } + } + })?; + + // Attempt to write to the beacon cache (only if the cache + // size is not the default value). + if chain.config.shuffling_cache_size + != beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE + { + if let Some(shuffling_id) = shuffling_id { + if let Some(mut cache_write) = chain + .shuffling_cache + .try_write_for(std::time::Duration::from_secs(1)) + { + cache_write.insert_committee_cache( + shuffling_id, + &*possibly_built_cache, + ); } } - _ => warp_utils::reject::beacon_chain_error(e.into()), - })?; + } + possibly_built_cache + }; // Use either the supplied slot or all slots in the epoch. let slots = diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 9ece45741..65ce26ad0 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -370,6 +370,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { address of this server (e.g., http://localhost:5054).") .takes_value(true), ) + .arg( + Arg::with_name("shuffling-cache-size") + .long("shuffling-cache-size") + .help("Some HTTP API requests can be optimised by caching the shufflings at each epoch. \ + This flag allows the user to set the shuffling cache size in epochs. \ + Shufflings are dependent on validator count and setting this value to a large number can consume a large amount of memory.") + .takes_value(true) + ) /* * Monitoring metrics diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 93ab1be94..36e8683e9 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -148,6 +148,10 @@ pub fn get_config( client_config.http_api.allow_sync_stalled = true; } + if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { + client_config.chain.shuffling_cache_size = cache_size; + } + /* * Prometheus metrics HTTP server */ diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 63ff6f79b..d1af1e4ed 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -118,6 +118,26 @@ fn disable_lock_timeouts_flag() { .with_config(|config| assert!(!config.chain.enable_lock_timeouts)); } +#[test] +fn shuffling_cache_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.shuffling_cache_size, + beacon_node::beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE + ) + }); +} + +#[test] +fn shuffling_cache_set() { + CommandLineTest::new() + .flag("shuffling-cache-size", Some("500")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.shuffling_cache_size, 500)); +} + #[test] fn fork_choice_before_proposal_timeout_default() { CommandLineTest::new() From 59e45fe349f091760737cfee14b49e35186a62b9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 21 Mar 2023 05:15:00 +0000 Subject: [PATCH 05/11] Reduce verbosity of reprocess queue logs (#4101) ## Issue Addressed NA ## Proposed Changes Replaces #4058 to attempt to reduce `ERRO Failed to send scheduled attestation` spam and provide more information for diagnosis. With this PR we achieve: - When dequeuing attestations after a block is received, send only one log which reports `n` failures (rather than `n` logs reporting `n` failures). - Make a distinction in logs between two separate attestation dequeuing events. - Add more information to both log events to help assist with troubleshooting. ## Additional Info NA --- .../work_reprocessing_queue.rs | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index 8c568a7ee..21fc2b641 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -573,6 +573,9 @@ impl ReprocessQueue { }) => { // Unqueue the attestations we have for this root, if any. if let Some(queued_ids) = self.awaiting_attestations_per_root.remove(&block_root) { + let mut sent_count = 0; + let mut failed_to_send_count = 0; + for id in queued_ids { metrics::inc_counter( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS, @@ -597,10 +600,9 @@ impl ReprocessQueue { // Send the work. if self.ready_work_tx.try_send(work).is_err() { - error!( - log, - "Failed to send scheduled attestation"; - ); + failed_to_send_count += 1; + } else { + sent_count += 1; } } else { // There is a mismatch between the attestation ids registered for this @@ -613,6 +615,18 @@ impl ReprocessQueue { ); } } + + if failed_to_send_count > 0 { + error!( + log, + "Ignored scheduled attestation(s) for block"; + "hint" => "system may be overloaded", + "parent_root" => ?parent_root, + "block_root" => ?block_root, + "failed_count" => failed_to_send_count, + "sent_count" => sent_count, + ); + } } // Unqueue the light client optimistic updates we have for this root, if any. if let Some(queued_lc_id) = self @@ -727,7 +741,9 @@ impl ReprocessQueue { if self.ready_work_tx.try_send(work).is_err() { error!( log, - "Failed to send scheduled attestation"; + "Ignored scheduled attestation"; + "hint" => "system may be overloaded", + "beacon_block_root" => ?root ); } From 3ac5583cf932412112b341116308b62cf240ba71 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 21 Mar 2023 05:15:01 +0000 Subject: [PATCH 06/11] Set Capella fork epoch for Mainnet (#4111) ## Issue Addressed NA ## Proposed Changes Sets the mainnet Capella fork epoch as per https://github.com/ethereum/consensus-specs/pull/3300 ## Additional Info I expect the `ef_tests` to fail until we get a compatible consensus spec tests release. --- .../built_in_network_configs/mainnet/config.yaml | 2 +- consensus/types/src/chain_spec.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 9d9852f62..0bbf873a3 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -38,7 +38,7 @@ BELLATRIX_FORK_VERSION: 0x02000000 BELLATRIX_FORK_EPOCH: 144896 # Sept 6, 2022, 11:34:47am UTC # Capella CAPELLA_FORK_VERSION: 0x03000000 -CAPELLA_FORK_EPOCH: 18446744073709551615 +CAPELLA_FORK_EPOCH: 194048 # April 12, 2023, 10:27:35pm UTC # Sharding SHARDING_FORK_VERSION: 0x03000000 SHARDING_FORK_EPOCH: 18446744073709551615 diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index bbb0b9712..dc89ab902 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -615,7 +615,7 @@ impl ChainSpec { * Capella hard fork params */ capella_fork_version: [0x03, 00, 00, 00], - capella_fork_epoch: None, + capella_fork_epoch: Some(Epoch::new(194048)), max_validators_per_withdrawals_sweep: 16384, /* From 1f8c17b5306a6620c266a68277d0e4a8acd969ff Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 21 Mar 2023 07:34:41 +0000 Subject: [PATCH 07/11] Fork choice modifications and cleanup (#3962) ## Issue Addressed NA ## Proposed Changes - Implements https://github.com/ethereum/consensus-specs/pull/3290/ - Bumps `ef-tests` to [v1.3.0-rc.4](https://github.com/ethereum/consensus-spec-tests/releases/tag/v1.3.0-rc.4). The `CountRealizedFull` concept has been removed and the `--count-unrealized-full` and `--count-unrealized` BN flags now do nothing but log a `WARN` when used. ## Database Migration Debt This PR removes the `best_justified_checkpoint` from fork choice. This field is persisted on-disk and the correct way to go about this would be to make a DB migration to remove the field. However, in this PR I've simply stubbed out the value with a junk value. I've taken this approach because if we're going to do a DB migration I'd love to remove the `Option`s around the justified and finalized checkpoints on `ProtoNode` whilst we're at it. Those options were added in #2822 which was included in Lighthouse v2.1.0. The options were only put there to handle the migration and they've been set to `Some` ever since v2.1.0. There's no reason to keep them as options anymore. I started adding the DB migration to this branch but I started to feel like I was bloating this rather critical PR with nice-to-haves. I've kept the partially-complete migration [over in my repo](https://github.com/paulhauner/lighthouse/tree/fc-pr-18-migration) so we can pick it up after this PR is merged. --- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../src/beacon_fork_choice_store.rs | 21 +- .../beacon_chain/src/block_verification.rs | 1 - beacon_node/beacon_chain/src/builder.rs | 8 +- .../beacon_chain/src/canonical_head.rs | 15 +- beacon_node/beacon_chain/src/chain_config.rs | 9 +- beacon_node/beacon_chain/src/fork_revert.rs | 3 - beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_chain/src/schema_change.rs | 9 + .../src/schema_change/migration_schema_v16.rs | 46 ++++ beacon_node/beacon_chain/tests/tests.rs | 4 +- beacon_node/src/cli.rs | 5 +- beacon_node/src/config.rs | 19 +- beacon_node/store/src/metadata.rs | 2 +- consensus/fork_choice/src/fork_choice.rs | 199 +++--------------- .../fork_choice/src/fork_choice_store.rs | 6 - consensus/fork_choice/src/lib.rs | 4 +- consensus/fork_choice/tests/tests.rs | 33 ++- .../src/fork_choice_test_definition.rs | 6 +- .../proto_array/src/justified_balances.rs | 2 +- consensus/proto_array/src/lib.rs | 4 +- consensus/proto_array/src/proto_array.rs | 98 +++------ .../src/proto_array_fork_choice.rs | 15 +- consensus/proto_array/src/ssz_container.rs | 9 +- lighthouse/tests/beacon_node.rs | 54 ++--- testing/ef_tests/Makefile | 2 +- testing/ef_tests/src/cases/fork_choice.rs | 30 +-- testing/ef_tests/src/handler.rs | 5 + testing/ef_tests/tests/tests.rs | 12 ++ 29 files changed, 217 insertions(+), 414 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v16.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8dee55569..9802935b2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -73,7 +73,7 @@ use itertools::process_results; use itertools::Itertools; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; -use proto_array::{CountUnrealizedFull, DoNotReOrg, ProposerHeadError}; +use proto_array::{DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; use slasher::Slasher; use slog::{crit, debug, error, info, trace, warn, Logger}; @@ -479,7 +479,6 @@ impl BeaconChain { pub fn load_fork_choice( store: BeaconStore, reset_payload_statuses: ResetPayloadStatuses, - count_unrealized_full: CountUnrealizedFull, spec: &ChainSpec, log: &Logger, ) -> Result>, Error> { @@ -496,7 +495,6 @@ impl BeaconChain { persisted_fork_choice.fork_choice, reset_payload_statuses, fc_store, - count_unrealized_full, spec, log, )?)) @@ -1900,7 +1898,6 @@ impl BeaconChain { self.slot()?, verified.indexed_attestation(), AttestationFromBlock::False, - &self.spec, ) .map_err(Into::into) } @@ -2868,7 +2865,7 @@ impl BeaconChain { &state, payload_verification_status, &self.spec, - count_unrealized.and(self.config.count_unrealized.into()), + count_unrealized, ) .map_err(|e| BlockError::BeaconChainError(e.into()))?; } @@ -2987,7 +2984,6 @@ impl BeaconChain { ResetPayloadStatuses::always_reset_conditionally( self.config.always_reset_payload_statuses, ), - self.config.count_unrealized_full, &self.store, &self.spec, &self.log, diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index b17613da0..71160fcb6 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -20,6 +20,14 @@ use types::{ Hash256, Slot, }; +/// Ensure this justified checkpoint has an epoch of 0 so that it is never +/// greater than the justified checkpoint and enshrined as the actual justified +/// checkpoint. +const JUNK_BEST_JUSTIFIED_CHECKPOINT: Checkpoint = Checkpoint { + epoch: Epoch::new(0), + root: Hash256::repeat_byte(0), +}; + #[derive(Debug)] pub enum Error { UnableToReadSlot, @@ -144,7 +152,6 @@ pub struct BeaconForkChoiceStore, Cold: ItemStore< finalized_checkpoint: Checkpoint, justified_checkpoint: Checkpoint, justified_balances: JustifiedBalances, - best_justified_checkpoint: Checkpoint, unrealized_justified_checkpoint: Checkpoint, unrealized_finalized_checkpoint: Checkpoint, proposer_boost_root: Hash256, @@ -194,7 +201,6 @@ where justified_checkpoint, justified_balances, finalized_checkpoint, - best_justified_checkpoint: justified_checkpoint, unrealized_justified_checkpoint: justified_checkpoint, unrealized_finalized_checkpoint: finalized_checkpoint, proposer_boost_root: Hash256::zero(), @@ -212,7 +218,7 @@ where finalized_checkpoint: self.finalized_checkpoint, justified_checkpoint: self.justified_checkpoint, justified_balances: self.justified_balances.effective_balances.clone(), - best_justified_checkpoint: self.best_justified_checkpoint, + best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT, unrealized_justified_checkpoint: self.unrealized_justified_checkpoint, unrealized_finalized_checkpoint: self.unrealized_finalized_checkpoint, proposer_boost_root: self.proposer_boost_root, @@ -234,7 +240,6 @@ where finalized_checkpoint: persisted.finalized_checkpoint, justified_checkpoint: persisted.justified_checkpoint, justified_balances, - best_justified_checkpoint: persisted.best_justified_checkpoint, unrealized_justified_checkpoint: persisted.unrealized_justified_checkpoint, unrealized_finalized_checkpoint: persisted.unrealized_finalized_checkpoint, proposer_boost_root: persisted.proposer_boost_root, @@ -277,10 +282,6 @@ where &self.justified_balances } - fn best_justified_checkpoint(&self) -> &Checkpoint { - &self.best_justified_checkpoint - } - fn finalized_checkpoint(&self) -> &Checkpoint { &self.finalized_checkpoint } @@ -333,10 +334,6 @@ where Ok(()) } - fn set_best_justified_checkpoint(&mut self, checkpoint: Checkpoint) { - self.best_justified_checkpoint = checkpoint - } - fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint) { self.unrealized_justified_checkpoint = checkpoint; } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8c169cfe5..5102381a1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1468,7 +1468,6 @@ impl ExecutionPendingBlock { current_slot, indexed_attestation, AttestationFromBlock::True, - &chain.spec, ) { Ok(()) => Ok(()), // Ignore invalid attestations whilst importing attestations from a block. The diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index cd465d4e7..8ad874ea0 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -18,7 +18,7 @@ use crate::{ }; use eth1::Config as Eth1Config; use execution_layer::ExecutionLayer; -use fork_choice::{ForkChoice, ResetPayloadStatuses}; +use fork_choice::{CountUnrealized, ForkChoice, ResetPayloadStatuses}; use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; @@ -265,7 +265,6 @@ where ResetPayloadStatuses::always_reset_conditionally( self.chain_config.always_reset_payload_statuses, ), - self.chain_config.count_unrealized_full, &self.spec, log, ) @@ -384,7 +383,6 @@ where &genesis.beacon_block, &genesis.beacon_state, current_slot, - self.chain_config.count_unrealized_full, &self.spec, ) .map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?; @@ -503,7 +501,6 @@ where &snapshot.beacon_block, &snapshot.beacon_state, current_slot, - self.chain_config.count_unrealized_full, &self.spec, ) .map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?; @@ -681,8 +678,7 @@ where store.clone(), Some(current_slot), &self.spec, - self.chain_config.count_unrealized.into(), - self.chain_config.count_unrealized_full, + CountUnrealized::True, )?; } diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 24c06680d..0e1c8a530 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -45,8 +45,7 @@ use crate::{ }; use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead}; use fork_choice::{ - CountUnrealizedFull, ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, - ResetPayloadStatuses, + ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, ResetPayloadStatuses, }; use itertools::process_results; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -285,19 +284,13 @@ impl CanonicalHead { // defensive programming. mut fork_choice_write_lock: RwLockWriteGuard>, reset_payload_statuses: ResetPayloadStatuses, - count_unrealized_full: CountUnrealizedFull, store: &BeaconStore, spec: &ChainSpec, log: &Logger, ) -> Result<(), Error> { - let fork_choice = >::load_fork_choice( - store.clone(), - reset_payload_statuses, - count_unrealized_full, - spec, - log, - )? - .ok_or(Error::MissingPersistedForkChoice)?; + let fork_choice = + >::load_fork_choice(store.clone(), reset_payload_statuses, spec, log)? + .ok_or(Error::MissingPersistedForkChoice)?; let fork_choice_view = fork_choice.cached_fork_choice_view(); let beacon_block_root = fork_choice_view.head_block_root; let beacon_block = store diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 315861894..1a5394256 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,4 +1,4 @@ -pub use proto_array::{CountUnrealizedFull, ReOrgThreshold}; +pub use proto_array::ReOrgThreshold; use serde_derive::{Deserialize, Serialize}; use std::time::Duration; use types::{Checkpoint, Epoch}; @@ -48,16 +48,11 @@ pub struct ChainConfig { pub builder_fallback_epochs_since_finalization: usize, /// Whether any chain health checks should be considered when deciding whether to use the builder API. pub builder_fallback_disable_checks: bool, - /// When set to `true`, weigh the "unrealized" FFG progression when choosing a head in fork - /// choice. - pub count_unrealized: bool, /// When set to `true`, forget any valid/invalid/optimistic statuses in fork choice during start /// up. pub always_reset_payload_statuses: bool, /// Whether to apply paranoid checks to blocks proposed by this beacon node. pub paranoid_block_proposal: bool, - /// Whether to strictly count unrealized justified votes. - pub count_unrealized_full: CountUnrealizedFull, /// Optionally set timeout for calls to checkpoint sync endpoint. pub checkpoint_sync_url_timeout: u64, /// The offset before the start of a proposal slot at which payload attributes should be sent. @@ -91,10 +86,8 @@ impl Default for ChainConfig { builder_fallback_skips_per_epoch: 8, builder_fallback_epochs_since_finalization: 3, builder_fallback_disable_checks: false, - count_unrealized: true, always_reset_payload_statuses: false, paranoid_block_proposal: false, - count_unrealized_full: CountUnrealizedFull::default(), checkpoint_sync_url_timeout: 60, prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 6d5b5ddc4..ef23248ab 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -1,7 +1,6 @@ use crate::{BeaconForkChoiceStore, BeaconSnapshot}; use fork_choice::{CountUnrealized, ForkChoice, PayloadVerificationStatus}; use itertools::process_results; -use proto_array::CountUnrealizedFull; use slog::{info, warn, Logger}; use state_processing::state_advance::complete_state_advance; use state_processing::{ @@ -102,7 +101,6 @@ pub fn reset_fork_choice_to_finalization, Cold: It current_slot: Option, spec: &ChainSpec, count_unrealized_config: CountUnrealized, - count_unrealized_full_config: CountUnrealizedFull, ) -> Result, E>, String> { // Fetch finalized block. let finalized_checkpoint = head_state.finalized_checkpoint(); @@ -156,7 +154,6 @@ pub fn reset_fork_choice_to_finalization, Cold: It &finalized_snapshot.beacon_block, &finalized_snapshot.beacon_state, current_slot, - count_unrealized_full_config, spec, ) .map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c25e308c4..af4780e46 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -57,7 +57,7 @@ pub use self::beacon_chain::{ INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; pub use self::beacon_snapshot::BeaconSnapshot; -pub use self::chain_config::{ChainConfig, CountUnrealizedFull}; +pub use self::chain_config::ChainConfig; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 35202a3c5..5808e648a 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -3,6 +3,7 @@ mod migration_schema_v12; mod migration_schema_v13; mod migration_schema_v14; mod migration_schema_v15; +mod migration_schema_v16; use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY}; use crate::eth1_chain::SszEth1; @@ -132,6 +133,14 @@ pub fn migrate_schema( let ops = migration_schema_v15::downgrade_from_v15::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(15), SchemaVersion(16)) => { + let ops = migration_schema_v16::upgrade_to_v16::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(16), SchemaVersion(15)) => { + let ops = migration_schema_v16::downgrade_from_v16::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v16.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v16.rs new file mode 100644 index 000000000..230573b02 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v16.rs @@ -0,0 +1,46 @@ +use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY}; +use crate::persisted_fork_choice::PersistedForkChoiceV11; +use slog::{debug, Logger}; +use std::sync::Arc; +use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem}; + +pub fn upgrade_to_v16( + db: Arc>, + log: Logger, +) -> Result, Error> { + drop_balances_cache::(db, log) +} + +pub fn downgrade_from_v16( + db: Arc>, + log: Logger, +) -> Result, Error> { + drop_balances_cache::(db, log) +} + +/// Drop the balances cache from the fork choice store. +/// +/// There aren't any type-level changes in this schema migration, however the +/// way that we compute the `JustifiedBalances` has changed due to: +/// https://github.com/sigp/lighthouse/pull/3962 +pub fn drop_balances_cache( + db: Arc>, + log: Logger, +) -> Result, Error> { + let mut persisted_fork_choice = db + .get_item::(&FORK_CHOICE_DB_KEY)? + .ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?; + + debug!( + log, + "Dropping fork choice balances cache"; + "item_count" => persisted_fork_choice.fork_choice_store.balances_cache.items.len() + ); + + // Drop all items in the balances cache. + persisted_fork_choice.fork_choice_store.balances_cache = <_>::default(); + + let kv_op = persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY); + + Ok(vec![kv_op]) +} diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 384fcbe5d..b4eabc809 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -500,7 +500,7 @@ async fn unaggregated_attestations_added_to_fork_choice_some_none() { // Move forward a slot so all queued attestations can be processed. harness.advance_slot(); fork_choice - .update_time(harness.chain.slot().unwrap(), &harness.chain.spec) + .update_time(harness.chain.slot().unwrap()) .unwrap(); let validator_slots: Vec<(usize, Slot)> = (0..VALIDATOR_COUNT) @@ -614,7 +614,7 @@ async fn unaggregated_attestations_added_to_fork_choice_all_updated() { // Move forward a slot so all queued attestations can be processed. harness.advance_slot(); fork_choice - .update_time(harness.chain.slot().unwrap(), &harness.chain.spec) + .update_time(harness.chain.slot().unwrap()) .unwrap(); let validators: Vec = (0..VALIDATOR_COUNT).collect(); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 65ce26ad0..1e1849347 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -966,8 +966,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("count-unrealized") .long("count-unrealized") .hidden(true) - .help("Enables an alternative, potentially more performant FFG \ - vote tracking method.") + .help("This flag is deprecated and has no effect.") .takes_value(true) .default_value("true") ) @@ -975,7 +974,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("count-unrealized-full") .long("count-unrealized-full") .hidden(true) - .help("Stricter version of `count-unrealized`.") + .help("This flag is deprecated and has no effect.") .takes_value(true) .default_value("false") ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 36e8683e9..c77fa49b1 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -709,10 +709,21 @@ pub fn get_config( client_config.chain.fork_choice_before_proposal_timeout_ms = timeout; } - client_config.chain.count_unrealized = - clap_utils::parse_required(cli_args, "count-unrealized")?; - client_config.chain.count_unrealized_full = - clap_utils::parse_required::(cli_args, "count-unrealized-full")?.into(); + if !clap_utils::parse_required::(cli_args, "count-unrealized")? { + warn!( + log, + "The flag --count-unrealized is deprecated and will be removed"; + "info" => "any use of the flag will have no effect" + ); + } + + if clap_utils::parse_required::(cli_args, "count-unrealized-full")? { + warn!( + log, + "The flag --count-unrealized-full is deprecated and will be removed"; + "info" => "setting it to `true` has no effect" + ); + } client_config.chain.always_reset_payload_statuses = cli_args.is_present("reset-payload-statuses"); diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 729b36ff2..8e9b3599b 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(16); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 916b1d558..8a4e35f45 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1,6 +1,6 @@ use crate::{ForkChoiceStore, InvalidationOperation}; use proto_array::{ - Block as ProtoBlock, CountUnrealizedFull, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, + Block as ProtoBlock, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold, }; use slog::{crit, debug, warn, Logger}; @@ -187,51 +187,6 @@ impl CountUnrealized { pub fn is_true(&self) -> bool { matches!(self, CountUnrealized::True) } - - pub fn and(&self, other: CountUnrealized) -> CountUnrealized { - if self.is_true() && other.is_true() { - CountUnrealized::True - } else { - CountUnrealized::False - } - } -} - -impl From for CountUnrealized { - fn from(count_unrealized: bool) -> Self { - if count_unrealized { - CountUnrealized::True - } else { - CountUnrealized::False - } - } -} - -#[derive(Copy, Clone)] -enum UpdateJustifiedCheckpointSlots { - OnTick { - current_slot: Slot, - }, - OnBlock { - state_slot: Slot, - current_slot: Slot, - }, -} - -impl UpdateJustifiedCheckpointSlots { - fn current_slot(&self) -> Slot { - match self { - UpdateJustifiedCheckpointSlots::OnTick { current_slot } => *current_slot, - UpdateJustifiedCheckpointSlots::OnBlock { current_slot, .. } => *current_slot, - } - } - - fn state_slot(&self) -> Option { - match self { - UpdateJustifiedCheckpointSlots::OnTick { .. } => None, - UpdateJustifiedCheckpointSlots::OnBlock { state_slot, .. } => Some(*state_slot), - } - } } /// Indicates if a block has been verified by an execution payload. @@ -393,7 +348,6 @@ where anchor_block: &SignedBeaconBlock, anchor_state: &BeaconState, current_slot: Option, - count_unrealized_full_config: CountUnrealizedFull, spec: &ChainSpec, ) -> Result> { // Sanity check: the anchor must lie on an epoch boundary. @@ -440,7 +394,6 @@ where current_epoch_shuffling_id, next_epoch_shuffling_id, execution_status, - count_unrealized_full_config, )?; let mut fork_choice = Self { @@ -533,7 +486,7 @@ where // Provide the slot (as per the system clock) to the `fc_store` and then return its view of // the current slot. The `fc_store` will ensure that the `current_slot` is never // decreasing, a property which we must maintain. - let current_slot = self.update_time(system_time_current_slot, spec)?; + let current_slot = self.update_time(system_time_current_slot)?; let store = &mut self.fc_store; @@ -654,58 +607,6 @@ where } } - /// Returns `true` if the given `store` should be updated to set - /// `state.current_justified_checkpoint` its `justified_checkpoint`. - /// - /// ## Specification - /// - /// Is equivalent to: - /// - /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#should_update_justified_checkpoint - fn should_update_justified_checkpoint( - &mut self, - new_justified_checkpoint: Checkpoint, - slots: UpdateJustifiedCheckpointSlots, - spec: &ChainSpec, - ) -> Result> { - self.update_time(slots.current_slot(), spec)?; - - if compute_slots_since_epoch_start::(self.fc_store.get_current_slot()) - < spec.safe_slots_to_update_justified - { - return Ok(true); - } - - let justified_slot = - compute_start_slot_at_epoch::(self.fc_store.justified_checkpoint().epoch); - - // This sanity check is not in the spec, but the invariant is implied. - if let Some(state_slot) = slots.state_slot() { - if justified_slot >= state_slot { - return Err(Error::AttemptToRevertJustification { - store: justified_slot, - state: state_slot, - }); - } - } - - // We know that the slot for `new_justified_checkpoint.root` is not greater than - // `state.slot`, since a state cannot justify its own slot. - // - // We know that `new_justified_checkpoint.root` is an ancestor of `state`, since a `state` - // only ever justifies ancestors. - // - // A prior `if` statement protects against a justified_slot that is greater than - // `state.slot` - let justified_ancestor = - self.get_ancestor(new_justified_checkpoint.root, justified_slot)?; - if justified_ancestor != Some(self.fc_store.justified_checkpoint().root) { - return Ok(false); - } - - Ok(true) - } - /// See `ProtoArrayForkChoice::process_execution_payload_validation` for documentation. pub fn on_valid_execution_payload( &mut self, @@ -759,7 +660,7 @@ where // Provide the slot (as per the system clock) to the `fc_store` and then return its view of // the current slot. The `fc_store` will ensure that the `current_slot` is never // decreasing, a property which we must maintain. - let current_slot = self.update_time(system_time_current_slot, spec)?; + let current_slot = self.update_time(system_time_current_slot)?; // Parent block must be known. let parent_block = self @@ -814,17 +715,10 @@ where self.fc_store.set_proposer_boost_root(block_root); } - let update_justified_checkpoint_slots = UpdateJustifiedCheckpointSlots::OnBlock { - state_slot: state.slot(), - current_slot, - }; - // Update store with checkpoints if necessary self.update_checkpoints( state.current_justified_checkpoint(), state.finalized_checkpoint(), - update_justified_checkpoint_slots, - spec, )?; // Update unrealized justified/finalized checkpoints. @@ -905,12 +799,7 @@ where // If block is from past epochs, try to update store's justified & finalized checkpoints right away if block.slot().epoch(E::slots_per_epoch()) < current_slot.epoch(E::slots_per_epoch()) { - self.update_checkpoints( - unrealized_justified_checkpoint, - unrealized_finalized_checkpoint, - update_justified_checkpoint_slots, - spec, - )?; + self.pull_up_store_checkpoints()?; } ( @@ -1004,29 +893,19 @@ where &mut self, justified_checkpoint: Checkpoint, finalized_checkpoint: Checkpoint, - slots: UpdateJustifiedCheckpointSlots, - spec: &ChainSpec, ) -> Result<(), Error> { // Update justified checkpoint. if justified_checkpoint.epoch > self.fc_store.justified_checkpoint().epoch { - if justified_checkpoint.epoch > self.fc_store.best_justified_checkpoint().epoch { - self.fc_store - .set_best_justified_checkpoint(justified_checkpoint); - } - if self.should_update_justified_checkpoint(justified_checkpoint, slots, spec)? { - self.fc_store - .set_justified_checkpoint(justified_checkpoint) - .map_err(Error::UnableToSetJustifiedCheckpoint)?; - } + self.fc_store + .set_justified_checkpoint(justified_checkpoint) + .map_err(Error::UnableToSetJustifiedCheckpoint)?; } // Update finalized checkpoint. if finalized_checkpoint.epoch > self.fc_store.finalized_checkpoint().epoch { self.fc_store.set_finalized_checkpoint(finalized_checkpoint); - self.fc_store - .set_justified_checkpoint(justified_checkpoint) - .map_err(Error::UnableToSetJustifiedCheckpoint)?; } + Ok(()) } @@ -1167,9 +1046,8 @@ where system_time_current_slot: Slot, attestation: &IndexedAttestation, is_from_block: AttestationFromBlock, - spec: &ChainSpec, ) -> Result<(), Error> { - self.update_time(system_time_current_slot, spec)?; + self.update_time(system_time_current_slot)?; // Ignore any attestations to the zero hash. // @@ -1230,16 +1108,12 @@ where /// Call `on_tick` for all slots between `fc_store.get_current_slot()` and the provided /// `current_slot`. Returns the value of `self.fc_store.get_current_slot`. - pub fn update_time( - &mut self, - current_slot: Slot, - spec: &ChainSpec, - ) -> Result> { + pub fn update_time(&mut self, current_slot: Slot) -> Result> { while self.fc_store.get_current_slot() < current_slot { let previous_slot = self.fc_store.get_current_slot(); // Note: we are relying upon `on_tick` to update `fc_store.time` to ensure we don't // get stuck in a loop. - self.on_tick(previous_slot + 1, spec)? + self.on_tick(previous_slot + 1)? } // Process any attestations that might now be eligible. @@ -1255,7 +1129,7 @@ where /// Equivalent to: /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#on_tick - fn on_tick(&mut self, time: Slot, spec: &ChainSpec) -> Result<(), Error> { + fn on_tick(&mut self, time: Slot) -> Result<(), Error> { let store = &mut self.fc_store; let previous_slot = store.get_current_slot(); @@ -1283,26 +1157,21 @@ where return Ok(()); } - if store.best_justified_checkpoint().epoch > store.justified_checkpoint().epoch { - let store = &self.fc_store; - if self.is_finalized_checkpoint_or_descendant(store.best_justified_checkpoint().root) { - let store = &mut self.fc_store; - store - .set_justified_checkpoint(*store.best_justified_checkpoint()) - .map_err(Error::ForkChoiceStoreError)?; - } - } + // Update the justified/finalized checkpoints based upon the + // best-observed unrealized justification/finality. + self.pull_up_store_checkpoints()?; + Ok(()) + } + + fn pull_up_store_checkpoints(&mut self) -> Result<(), Error> { // Update store.justified_checkpoint if a better unrealized justified checkpoint is known let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint(); let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint(); self.update_checkpoints( unrealized_justified_checkpoint, unrealized_finalized_checkpoint, - UpdateJustifiedCheckpointSlots::OnTick { current_slot }, - spec, - )?; - Ok(()) + ) } /// Processes and removes from the queue any queued attestations which may now be eligible for @@ -1468,16 +1337,6 @@ where *self.fc_store.justified_checkpoint() } - /// Return the best justified checkpoint. - /// - /// ## Warning - /// - /// This is distinct to the "justified checkpoint" or the "current justified checkpoint". This - /// "best justified checkpoint" value should only be used internally or for testing. - pub fn best_justified_checkpoint(&self) -> Checkpoint { - *self.fc_store.best_justified_checkpoint() - } - pub fn unrealized_justified_checkpoint(&self) -> Checkpoint { *self.fc_store.unrealized_justified_checkpoint() } @@ -1538,13 +1397,11 @@ where pub fn proto_array_from_persisted( persisted: &PersistedForkChoice, reset_payload_statuses: ResetPayloadStatuses, - count_unrealized_full: CountUnrealizedFull, spec: &ChainSpec, log: &Logger, ) -> Result> { - let mut proto_array = - ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes, count_unrealized_full) - .map_err(Error::InvalidProtoArrayBytes)?; + let mut proto_array = ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes) + .map_err(Error::InvalidProtoArrayBytes)?; let contains_invalid_payloads = proto_array.contains_invalid_payloads(); debug!( @@ -1575,7 +1432,7 @@ where "error" => e, "info" => "please report this error", ); - ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes, count_unrealized_full) + ProtoArrayForkChoice::from_bytes(&persisted.proto_array_bytes) .map_err(Error::InvalidProtoArrayBytes) } else { debug!( @@ -1592,17 +1449,11 @@ where persisted: PersistedForkChoice, reset_payload_statuses: ResetPayloadStatuses, fc_store: T, - count_unrealized_full: CountUnrealizedFull, spec: &ChainSpec, log: &Logger, ) -> Result> { - let proto_array = Self::proto_array_from_persisted( - &persisted, - reset_payload_statuses, - count_unrealized_full, - spec, - log, - )?; + let proto_array = + Self::proto_array_from_persisted(&persisted, reset_payload_statuses, spec, log)?; let current_slot = fc_store.get_current_slot(); diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index 9500b1c7d..320f10141 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -47,9 +47,6 @@ pub trait ForkChoiceStore: Sized { /// Returns balances from the `state` identified by `justified_checkpoint.root`. fn justified_balances(&self) -> &JustifiedBalances; - /// Returns the `best_justified_checkpoint`. - fn best_justified_checkpoint(&self) -> &Checkpoint; - /// Returns the `finalized_checkpoint`. fn finalized_checkpoint(&self) -> &Checkpoint; @@ -68,9 +65,6 @@ pub trait ForkChoiceStore: Sized { /// Sets the `justified_checkpoint`. fn set_justified_checkpoint(&mut self, checkpoint: Checkpoint) -> Result<(), Self::Error>; - /// Sets the `best_justified_checkpoint`. - fn set_best_justified_checkpoint(&mut self, checkpoint: Checkpoint); - /// Sets the `unrealized_justified_checkpoint`. fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint); diff --git a/consensus/fork_choice/src/lib.rs b/consensus/fork_choice/src/lib.rs index b307c66d8..397a2ff89 100644 --- a/consensus/fork_choice/src/lib.rs +++ b/consensus/fork_choice/src/lib.rs @@ -7,6 +7,4 @@ pub use crate::fork_choice::{ PersistedForkChoice, QueuedAttestation, ResetPayloadStatuses, }; pub use fork_choice_store::ForkChoiceStore; -pub use proto_array::{ - Block as ProtoBlock, CountUnrealizedFull, ExecutionStatus, InvalidationOperation, -}; +pub use proto_array::{Block as ProtoBlock, ExecutionStatus, InvalidationOperation}; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 00bd1f763..82bf642f1 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -104,16 +104,6 @@ impl ForkChoiceTest { self } - /// Assert the epochs match. - pub fn assert_best_justified_epoch(self, epoch: u64) -> Self { - assert_eq!( - self.get(|fc_store| fc_store.best_justified_checkpoint().epoch), - Epoch::new(epoch), - "best_justified_epoch" - ); - self - } - /// Assert the given slot is greater than the head slot. pub fn assert_finalized_epoch_is_less_than(self, epoch: Epoch) -> Self { assert!(self.harness.finalized_checkpoint().epoch < epoch); @@ -151,7 +141,7 @@ impl ForkChoiceTest { .chain .canonical_head .fork_choice_write_lock() - .update_time(self.harness.chain.slot().unwrap(), &self.harness.spec) + .update_time(self.harness.chain.slot().unwrap()) .unwrap(); func( self.harness @@ -241,6 +231,11 @@ impl ForkChoiceTest { /// /// If the chain is presently in an unsafe period, transition through it and the following safe /// period. + /// + /// Note: the `SAFE_SLOTS_TO_UPDATE_JUSTIFIED` variable has been removed + /// from the fork choice spec in Q1 2023. We're still leaving references to + /// it in our tests because (a) it's easier and (b) it allows us to easily + /// test for the absence of that parameter. pub fn move_to_next_unsafe_period(self) -> Self { self.move_inside_safe_to_update() .move_outside_safe_to_update() @@ -534,7 +529,6 @@ async fn justified_checkpoint_updates_with_descendent_outside_safe_slots() { .unwrap() .move_outside_safe_to_update() .assert_justified_epoch(2) - .assert_best_justified_epoch(2) .apply_blocks(1) .await .assert_justified_epoch(3); @@ -551,11 +545,9 @@ async fn justified_checkpoint_updates_first_justification_outside_safe_to_update .unwrap() .move_to_next_unsafe_period() .assert_justified_epoch(0) - .assert_best_justified_epoch(0) .apply_blocks(1) .await - .assert_justified_epoch(2) - .assert_best_justified_epoch(2); + .assert_justified_epoch(2); } /// - The new justified checkpoint **does not** descend from the current. @@ -583,8 +575,7 @@ async fn justified_checkpoint_updates_with_non_descendent_inside_safe_slots_with .unwrap(); }) .await - .assert_justified_epoch(3) - .assert_best_justified_epoch(3); + .assert_justified_epoch(3); } /// - The new justified checkpoint **does not** descend from the current. @@ -612,8 +603,9 @@ async fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_wit .unwrap(); }) .await - .assert_justified_epoch(2) - .assert_best_justified_epoch(3); + // Now that `SAFE_SLOTS_TO_UPDATE_JUSTIFIED` has been removed, the new + // block should have updated the justified checkpoint. + .assert_justified_epoch(3); } /// - The new justified checkpoint **does not** descend from the current. @@ -641,8 +633,7 @@ async fn justified_checkpoint_updates_with_non_descendent_outside_safe_slots_wit .unwrap(); }) .await - .assert_justified_epoch(3) - .assert_best_justified_epoch(3); + .assert_justified_epoch(3); } /// Check that the balances are obtained correctly. diff --git a/consensus/proto_array/src/fork_choice_test_definition.rs b/consensus/proto_array/src/fork_choice_test_definition.rs index 68b3fb719..157f072ad 100644 --- a/consensus/proto_array/src/fork_choice_test_definition.rs +++ b/consensus/proto_array/src/fork_choice_test_definition.rs @@ -3,7 +3,6 @@ mod ffg_updates; mod no_votes; mod votes; -use crate::proto_array::CountUnrealizedFull; use crate::proto_array_fork_choice::{Block, ExecutionStatus, ProtoArrayForkChoice}; use crate::{InvalidationOperation, JustifiedBalances}; use serde_derive::{Deserialize, Serialize}; @@ -88,7 +87,6 @@ impl ForkChoiceTestDefinition { junk_shuffling_id.clone(), junk_shuffling_id, ExecutionStatus::Optimistic(ExecutionBlockHash::zero()), - CountUnrealizedFull::default(), ) .expect("should create fork choice struct"); let equivocating_indices = BTreeSet::new(); @@ -307,8 +305,8 @@ fn get_checkpoint(i: u64) -> Checkpoint { fn check_bytes_round_trip(original: &ProtoArrayForkChoice) { let bytes = original.as_bytes(); - let decoded = ProtoArrayForkChoice::from_bytes(&bytes, CountUnrealizedFull::default()) - .expect("fork choice should decode from bytes"); + let decoded = + ProtoArrayForkChoice::from_bytes(&bytes).expect("fork choice should decode from bytes"); assert!( *original == decoded, "fork choice should encode and decode without change" diff --git a/consensus/proto_array/src/justified_balances.rs b/consensus/proto_array/src/justified_balances.rs index 75f6c2f7c..c8787817f 100644 --- a/consensus/proto_array/src/justified_balances.rs +++ b/consensus/proto_array/src/justified_balances.rs @@ -24,7 +24,7 @@ impl JustifiedBalances { .validators() .iter() .map(|validator| { - if validator.is_active_at(current_epoch) { + if !validator.slashed && validator.is_active_at(current_epoch) { total_effective_balance.safe_add_assign(validator.effective_balance)?; num_active_validators.safe_add_assign(1)?; diff --git a/consensus/proto_array/src/lib.rs b/consensus/proto_array/src/lib.rs index f2b29e1c7..e84139345 100644 --- a/consensus/proto_array/src/lib.rs +++ b/consensus/proto_array/src/lib.rs @@ -6,9 +6,7 @@ mod proto_array_fork_choice; mod ssz_container; pub use crate::justified_balances::JustifiedBalances; -pub use crate::proto_array::{ - calculate_committee_fraction, CountUnrealizedFull, InvalidationOperation, -}; +pub use crate::proto_array::{calculate_committee_fraction, InvalidationOperation}; pub use crate::proto_array_fork_choice::{ Block, DoNotReOrg, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold, diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index bf50c0802..2c2514b20 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -118,24 +118,6 @@ impl Default for ProposerBoost { } } -/// Indicate whether we should strictly count unrealized justification/finalization votes. -#[derive(Default, PartialEq, Eq, Debug, Serialize, Deserialize, Copy, Clone)] -pub enum CountUnrealizedFull { - True, - #[default] - False, -} - -impl From for CountUnrealizedFull { - fn from(b: bool) -> Self { - if b { - CountUnrealizedFull::True - } else { - CountUnrealizedFull::False - } - } -} - #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct ProtoArray { /// Do not attempt to prune the tree unless it has at least this many nodes. Small prunes @@ -146,7 +128,6 @@ pub struct ProtoArray { pub nodes: Vec, pub indices: HashMap, pub previous_proposer_boost: ProposerBoost, - pub count_unrealized_full: CountUnrealizedFull, } impl ProtoArray { @@ -900,55 +881,44 @@ impl ProtoArray { } let genesis_epoch = Epoch::new(0); - - let checkpoint_match_predicate = - |node_justified_checkpoint: Checkpoint, node_finalized_checkpoint: Checkpoint| { - let correct_justified = node_justified_checkpoint == self.justified_checkpoint - || self.justified_checkpoint.epoch == genesis_epoch; - let correct_finalized = node_finalized_checkpoint == self.finalized_checkpoint - || self.finalized_checkpoint.epoch == genesis_epoch; - correct_justified && correct_finalized + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let node_epoch = node.slot.epoch(E::slots_per_epoch()); + let node_justified_checkpoint = + if let Some(justified_checkpoint) = node.justified_checkpoint { + justified_checkpoint + } else { + // The node does not have any information about the justified + // checkpoint. This indicates an inconsistent proto-array. + return false; }; - if let ( - Some(unrealized_justified_checkpoint), - Some(unrealized_finalized_checkpoint), - Some(justified_checkpoint), - Some(finalized_checkpoint), - ) = ( - node.unrealized_justified_checkpoint, - node.unrealized_finalized_checkpoint, - node.justified_checkpoint, - node.finalized_checkpoint, - ) { - let current_epoch = current_slot.epoch(E::slots_per_epoch()); - - // If previous epoch is justified, pull up all tips to at least the previous epoch - if CountUnrealizedFull::True == self.count_unrealized_full - && (current_epoch > genesis_epoch - && self.justified_checkpoint.epoch + 1 == current_epoch) - { - unrealized_justified_checkpoint.epoch + 1 >= current_epoch - // If previous epoch is not justified, pull up only tips from past epochs up to the current epoch - } else { - // If block is from a previous epoch, filter using unrealized justification & finalization information - if node.slot.epoch(E::slots_per_epoch()) < current_epoch { - checkpoint_match_predicate( - unrealized_justified_checkpoint, - unrealized_finalized_checkpoint, - ) - // If block is from the current epoch, filter using the head state's justification & finalization information - } else { - checkpoint_match_predicate(justified_checkpoint, finalized_checkpoint) - } - } - } else if let (Some(justified_checkpoint), Some(finalized_checkpoint)) = - (node.justified_checkpoint, node.finalized_checkpoint) - { - checkpoint_match_predicate(justified_checkpoint, finalized_checkpoint) + let voting_source = if current_epoch > node_epoch { + // The block is from a prior epoch, the voting source will be pulled-up. + node.unrealized_justified_checkpoint + // Sometimes we don't track the unrealized justification. In + // that case, just use the fully-realized justified checkpoint. + .unwrap_or(node_justified_checkpoint) } else { - false + // The block is not from a prior epoch, therefore the voting source + // is not pulled up. + node_justified_checkpoint + }; + + let mut correct_justified = self.justified_checkpoint.epoch == genesis_epoch + || voting_source.epoch == self.justified_checkpoint.epoch; + + if let Some(node_unrealized_justified_checkpoint) = node.unrealized_justified_checkpoint { + if !correct_justified && self.justified_checkpoint.epoch + 1 == current_epoch { + correct_justified = node_unrealized_justified_checkpoint.epoch + >= self.justified_checkpoint.epoch + && voting_source.epoch + 2 >= current_epoch; + } } + + let correct_finalized = self.finalized_checkpoint.epoch == genesis_epoch + || self.is_finalized_checkpoint_or_descendant::(node.root); + + correct_justified && correct_finalized } /// Return a reverse iterator over the nodes which comprise the chain ending at `block_root`. diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 0e0d806e7..eae54e734 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -1,8 +1,8 @@ use crate::{ error::Error, proto_array::{ - calculate_committee_fraction, CountUnrealizedFull, InvalidationOperation, Iter, - ProposerBoost, ProtoArray, ProtoNode, + calculate_committee_fraction, InvalidationOperation, Iter, ProposerBoost, ProtoArray, + ProtoNode, }, ssz_container::SszContainer, JustifiedBalances, @@ -307,7 +307,6 @@ impl ProtoArrayForkChoice { current_epoch_shuffling_id: AttestationShufflingId, next_epoch_shuffling_id: AttestationShufflingId, execution_status: ExecutionStatus, - count_unrealized_full: CountUnrealizedFull, ) -> Result { let mut proto_array = ProtoArray { prune_threshold: DEFAULT_PRUNE_THRESHOLD, @@ -316,7 +315,6 @@ impl ProtoArrayForkChoice { nodes: Vec::with_capacity(1), indices: HashMap::with_capacity(1), previous_proposer_boost: ProposerBoost::default(), - count_unrealized_full, }; let block = Block { @@ -780,13 +778,10 @@ impl ProtoArrayForkChoice { SszContainer::from(self).as_ssz_bytes() } - pub fn from_bytes( - bytes: &[u8], - count_unrealized_full: CountUnrealizedFull, - ) -> Result { + pub fn from_bytes(bytes: &[u8]) -> Result { let container = SszContainer::from_ssz_bytes(bytes) .map_err(|e| format!("Failed to decode ProtoArrayForkChoice: {:?}", e))?; - (container, count_unrealized_full) + container .try_into() .map_err(|e| format!("Failed to initialize ProtoArrayForkChoice: {e:?}")) } @@ -950,7 +945,6 @@ mod test_compute_deltas { junk_shuffling_id.clone(), junk_shuffling_id.clone(), execution_status, - CountUnrealizedFull::default(), ) .unwrap(); @@ -1076,7 +1070,6 @@ mod test_compute_deltas { junk_shuffling_id.clone(), junk_shuffling_id.clone(), execution_status, - CountUnrealizedFull::default(), ) .unwrap(); diff --git a/consensus/proto_array/src/ssz_container.rs b/consensus/proto_array/src/ssz_container.rs index 1a20ef967..ed1efaae1 100644 --- a/consensus/proto_array/src/ssz_container.rs +++ b/consensus/proto_array/src/ssz_container.rs @@ -1,6 +1,6 @@ use crate::proto_array::ProposerBoost; use crate::{ - proto_array::{CountUnrealizedFull, ProtoArray, ProtoNode}, + proto_array::{ProtoArray, ProtoNode}, proto_array_fork_choice::{ElasticList, ProtoArrayForkChoice, VoteTracker}, Error, JustifiedBalances, }; @@ -43,12 +43,10 @@ impl From<&ProtoArrayForkChoice> for SszContainer { } } -impl TryFrom<(SszContainer, CountUnrealizedFull)> for ProtoArrayForkChoice { +impl TryFrom for ProtoArrayForkChoice { type Error = Error; - fn try_from( - (from, count_unrealized_full): (SszContainer, CountUnrealizedFull), - ) -> Result { + fn try_from(from: SszContainer) -> Result { let proto_array = ProtoArray { prune_threshold: from.prune_threshold, justified_checkpoint: from.justified_checkpoint, @@ -56,7 +54,6 @@ impl TryFrom<(SszContainer, CountUnrealizedFull)> for ProtoArrayForkChoice { nodes: from.nodes, indices: from.indices.into_iter().collect::>(), previous_proposer_boost: from.previous_proposer_boost, - count_unrealized_full, }; Ok(Self { diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index d1af1e4ed..078bca95e 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1,4 +1,4 @@ -use beacon_node::{beacon_chain::CountUnrealizedFull, ClientConfig as Config}; +use beacon_node::ClientConfig as Config; use crate::exec::{CommandLineTestExec, CompletedTest}; use beacon_node::beacon_chain::chain_config::{ @@ -232,74 +232,58 @@ fn paranoid_block_proposal_on() { .with_config(|config| assert!(config.chain.paranoid_block_proposal)); } -#[test] -fn count_unrealized_default() { - CommandLineTest::new() - .run_with_zero_port() - .with_config(|config| assert!(config.chain.count_unrealized)); -} - #[test] fn count_unrealized_no_arg() { CommandLineTest::new() .flag("count-unrealized", None) - .run_with_zero_port() - .with_config(|config| assert!(config.chain.count_unrealized)); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] fn count_unrealized_false() { CommandLineTest::new() .flag("count-unrealized", Some("false")) - .run_with_zero_port() - .with_config(|config| assert!(!config.chain.count_unrealized)); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] fn count_unrealized_true() { CommandLineTest::new() .flag("count-unrealized", Some("true")) - .run_with_zero_port() - .with_config(|config| assert!(config.chain.count_unrealized)); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] fn count_unrealized_full_no_arg() { CommandLineTest::new() .flag("count-unrealized-full", None) - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.count_unrealized_full, - CountUnrealizedFull::False - ) - }); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] fn count_unrealized_full_false() { CommandLineTest::new() .flag("count-unrealized-full", Some("false")) - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.count_unrealized_full, - CountUnrealizedFull::False - ) - }); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] fn count_unrealized_full_true() { CommandLineTest::new() .flag("count-unrealized-full", Some("true")) - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.count_unrealized_full, - CountUnrealizedFull::True - ) - }); + // This flag should be ignored, so there's nothing to test but that the + // client starts with the flag present. + .run_with_zero_port(); } #[test] diff --git a/testing/ef_tests/Makefile b/testing/ef_tests/Makefile index fc3dea6e2..f7562f477 100644 --- a/testing/ef_tests/Makefile +++ b/testing/ef_tests/Makefile @@ -1,4 +1,4 @@ -TESTS_TAG := v1.3.0-rc.3 +TESTS_TAG := v1.3.0-rc.4 TESTS = general minimal mainnet TARBALLS = $(patsubst %,%-$(TESTS_TAG).tar.gz,$(TESTS)) diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 31165d632..7c3154a32 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -45,7 +45,6 @@ pub struct Checks { justified_checkpoint: Option, justified_checkpoint_root: Option, finalized_checkpoint: Option, - best_justified_checkpoint: Option, u_justified_checkpoint: Option, u_finalized_checkpoint: Option, proposer_boost_root: Option, @@ -229,7 +228,6 @@ impl Case for ForkChoiceTest { justified_checkpoint, justified_checkpoint_root, finalized_checkpoint, - best_justified_checkpoint, u_justified_checkpoint, u_finalized_checkpoint, proposer_boost_root, @@ -260,11 +258,6 @@ impl Case for ForkChoiceTest { tester.check_finalized_checkpoint(*expected_finalized_checkpoint)?; } - if let Some(expected_best_justified_checkpoint) = best_justified_checkpoint { - tester - .check_best_justified_checkpoint(*expected_best_justified_checkpoint)?; - } - if let Some(expected_u_justified_checkpoint) = u_justified_checkpoint { tester.check_u_justified_checkpoint(*expected_u_justified_checkpoint)?; } @@ -378,7 +371,7 @@ impl Tester { .chain .canonical_head .fork_choice_write_lock() - .update_time(slot, &self.spec) + .update_time(slot) .unwrap(); } @@ -388,7 +381,7 @@ impl Tester { let result = self.block_on_dangerous(self.harness.chain.process_block( block_root, block.clone(), - CountUnrealized::False, + CountUnrealized::True, NotifyExecutionLayer::Yes, ))?; if result.is_ok() != valid { @@ -448,7 +441,7 @@ impl Tester { &state, PayloadVerificationStatus::Irrelevant, &self.harness.chain.spec, - self.harness.chain.config.count_unrealized.into(), + CountUnrealized::True, ); if result.is_ok() { @@ -576,23 +569,6 @@ impl Tester { check_equal("finalized_checkpoint", fc_checkpoint, expected_checkpoint) } - pub fn check_best_justified_checkpoint( - &self, - expected_checkpoint: Checkpoint, - ) -> Result<(), Error> { - let best_justified_checkpoint = self - .harness - .chain - .canonical_head - .fork_choice_read_lock() - .best_justified_checkpoint(); - check_equal( - "best_justified_checkpoint", - best_justified_checkpoint, - expected_checkpoint, - ) - } - pub fn check_u_justified_checkpoint( &self, expected_checkpoint: Checkpoint, diff --git a/testing/ef_tests/src/handler.rs b/testing/ef_tests/src/handler.rs index abf18b350..2ed596e25 100644 --- a/testing/ef_tests/src/handler.rs +++ b/testing/ef_tests/src/handler.rs @@ -547,6 +547,11 @@ impl Handler for ForkChoiceHandler { return false; } + // Tests are no longer generated for the base/phase0 specification. + if fork_name == ForkName::Base { + return false; + } + // These tests check block validity (which may include signatures) and there is no need to // run them with fake crypto. cfg!(not(feature = "fake_crypto")) diff --git a/testing/ef_tests/tests/tests.rs b/testing/ef_tests/tests/tests.rs index 8a7209b89..33f8d67ec 100644 --- a/testing/ef_tests/tests/tests.rs +++ b/testing/ef_tests/tests/tests.rs @@ -503,6 +503,18 @@ fn fork_choice_ex_ante() { ForkChoiceHandler::::new("ex_ante").run(); } +#[test] +fn fork_choice_reorg() { + ForkChoiceHandler::::new("reorg").run(); + // There is no mainnet variant for this test. +} + +#[test] +fn fork_choice_withholding() { + ForkChoiceHandler::::new("withholding").run(); + // There is no mainnet variant for this test. +} + #[test] fn optimistic_sync() { OptimisticSyncHandler::::default().run(); From 0616e01202bb7b90bceb8b8d57f4ebbcbc29088b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 22 Mar 2023 04:06:02 +0000 Subject: [PATCH 08/11] Release v4.0.0 (#4112) ## Issue Addressed NA ## Proposed Changes Bump versions to `v4.0.0` ## Additional Info NA --- Cargo.lock | 8 ++++---- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 4 ++-- lcli/Cargo.toml | 2 +- lighthouse/Cargo.toml | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 021a14e0c..1823b3f27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "3.5.1" +version = "4.0.0" dependencies = [ "beacon_chain", "clap", @@ -786,7 +786,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "3.5.1" +version = "4.0.0" dependencies = [ "beacon_node", "clap", @@ -3770,7 +3770,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "3.5.1" +version = "4.0.0" dependencies = [ "account_utils", "beacon_chain", @@ -4374,7 +4374,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "3.5.1" +version = "4.0.0" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 521e2b89c..a281599a0 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "3.5.1" +version = "4.0.0" authors = ["Paul Hauner ", "Age Manning "] edition = "2021" diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 10d1a8c32..c81b632e8 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v3.5.1-", - fallback = "Lighthouse/v3.5.1" + prefix = "Lighthouse/v4.0.0-", + fallback = "Lighthouse/v4.0.0" ); /// Returns `VERSION`, but with platform information appended to the end. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index caceb9977..4a8f20e74 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "3.5.1" +version = "4.0.0" authors = ["Paul Hauner "] edition = "2021" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 331e9fe59..ec40d5dbc 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "3.5.1" +version = "4.0.0" authors = ["Sigma Prime "] edition = "2021" autotests = false From 23ea1481e067b2ce7688e5032bead15b87092d45 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 23 Mar 2023 07:16:49 +0000 Subject: [PATCH 09/11] Fix fork choice error message (#4122) ## Issue Addressed NA ## Proposed Changes Ensures that we log the values of the *head* block rather than the *justified* block. ## Additional Info NA --- consensus/fork_choice/src/fork_choice.rs | 21 +++++++++++++++------ consensus/proto_array/src/proto_array.rs | 6 +++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 8a4e35f45..b9d204676 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -799,7 +799,10 @@ where // If block is from past epochs, try to update store's justified & finalized checkpoints right away if block.slot().epoch(E::slots_per_epoch()) < current_slot.epoch(E::slots_per_epoch()) { - self.pull_up_store_checkpoints()?; + self.pull_up_store_checkpoints( + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + )?; } ( @@ -1159,15 +1162,21 @@ where // Update the justified/finalized checkpoints based upon the // best-observed unrealized justification/finality. - self.pull_up_store_checkpoints()?; + let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint(); + let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint(); + self.pull_up_store_checkpoints( + unrealized_justified_checkpoint, + unrealized_finalized_checkpoint, + )?; Ok(()) } - fn pull_up_store_checkpoints(&mut self) -> Result<(), Error> { - // Update store.justified_checkpoint if a better unrealized justified checkpoint is known - let unrealized_justified_checkpoint = *self.fc_store.unrealized_justified_checkpoint(); - let unrealized_finalized_checkpoint = *self.fc_store.unrealized_finalized_checkpoint(); + fn pull_up_store_checkpoints( + &mut self, + unrealized_justified_checkpoint: Checkpoint, + unrealized_finalized_checkpoint: Checkpoint, + ) -> Result<(), Error> { self.update_checkpoints( unrealized_justified_checkpoint, unrealized_finalized_checkpoint, diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 2c2514b20..2c19206cb 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -665,9 +665,9 @@ impl ProtoArray { start_root: *justified_root, justified_checkpoint: self.justified_checkpoint, finalized_checkpoint: self.finalized_checkpoint, - head_root: justified_node.root, - head_justified_checkpoint: justified_node.justified_checkpoint, - head_finalized_checkpoint: justified_node.finalized_checkpoint, + head_root: best_node.root, + head_justified_checkpoint: best_node.justified_checkpoint, + head_finalized_checkpoint: best_node.finalized_checkpoint, }))); } From b2525d6ebde369c9010080ae2c585927449a262a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 23 Mar 2023 21:16:14 +1100 Subject: [PATCH 10/11] Release Candidate v4.0.1-rc.0 (#4123) --- Cargo.lock | 8 ++++---- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 4 ++-- lcli/Cargo.toml | 2 +- lighthouse/Cargo.toml | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1823b3f27..eb26414d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,7 +618,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "4.0.0" +version = "4.0.1-rc.0" dependencies = [ "beacon_chain", "clap", @@ -786,7 +786,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "4.0.0" +version = "4.0.1-rc.0" dependencies = [ "beacon_node", "clap", @@ -3770,7 +3770,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "4.0.0" +version = "4.0.1-rc.0" dependencies = [ "account_utils", "beacon_chain", @@ -4374,7 +4374,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "4.0.0" +version = "4.0.1-rc.0" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index a281599a0..fed3b96ca 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "4.0.0" +version = "4.0.1-rc.0" authors = ["Paul Hauner ", "Age Manning "] edition = "2021" diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index c81b632e8..f4e19e796 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v4.0.0-", - fallback = "Lighthouse/v4.0.0" + prefix = "Lighthouse/v4.0.1-rc.0-", + fallback = "Lighthouse/v4.0.1-rc.0" ); /// Returns `VERSION`, but with platform information appended to the end. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 4a8f20e74..84b66c37d 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "4.0.0" +version = "4.0.1-rc.0" authors = ["Paul Hauner "] edition = "2021" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index ec40d5dbc..143ca86c3 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "4.0.0" +version = "4.0.1-rc.0" authors = ["Sigma Prime "] edition = "2021" autotests = false From 1093ba1a2717c21496de171db01669ae0e6d40dd Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 24 Mar 2023 14:32:58 -0500 Subject: [PATCH 11/11] revert change to ef_tests --- testing/ef_tests/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/ef_tests/Makefile b/testing/ef_tests/Makefile index f7562f477..eb2aa1030 100644 --- a/testing/ef_tests/Makefile +++ b/testing/ef_tests/Makefile @@ -1,4 +1,4 @@ -TESTS_TAG := v1.3.0-rc.4 +TESTS_TAG := v1.3.0-rc.1 # FIXME: move to latest TESTS = general minimal mainnet TARBALLS = $(patsubst %,%-$(TESTS_TAG).tar.gz,$(TESTS))