diff --git a/Cargo.lock b/Cargo.lock index fa004c4a9..87c20ee0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -614,6 +614,7 @@ dependencies = [ "task_executor", "tempfile", "tokio", + "tokio-stream", "tree_hash", "types", "unused_port", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index b1aca6463..1d091d679 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -41,6 +41,7 @@ state_processing = { path = "../../consensus/state_processing" } tree_hash = "0.4.1" types = { path = "../../consensus/types" } tokio = "1.14.0" +tokio-stream = "0.1.3" eth1 = { path = "../eth1" } futures = "0.3.7" genesis = { path = "../genesis" } diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs new file mode 100644 index 000000000..e56c0c3e3 --- /dev/null +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -0,0 +1,974 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; +use slog::{crit, debug, Logger}; +use std::collections::HashMap; +use std::sync::Arc; +use store::{DatabaseBlock, ExecutionPayloadEip4844}; +use task_executor::TaskExecutor; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream}; +use types::{ + ChainSpec, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, Hash256, SignedBeaconBlock, + SignedBlindedBeaconBlock, Slot, +}; +use types::{ + ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadHeader, ExecutionPayloadMerge, +}; + +#[derive(PartialEq)] +pub enum CheckEarlyAttesterCache { + Yes, + No, +} + +#[derive(Debug)] +pub enum Error { + PayloadReconstruction(String), + BlocksByRangeFailure(Box), + RequestNotFound, + BlockResultNotFound, +} + +const BLOCKS_PER_RANGE_REQUEST: u64 = 32; + +// This is the same as a DatabaseBlock but the Arc allows us to avoid an unnecessary clone. +enum LoadedBeaconBlock { + Full(Arc>), + Blinded(Box>), +} +type LoadResult = Result>, BeaconChainError>; +type BlockResult = Result>>, BeaconChainError>; + +enum RequestState { + UnSent(Vec>), + Sent(HashMap>>), +} + +struct BodiesByRange { + start: u64, + count: u64, + state: RequestState, +} + +// stores the components of a block for future re-construction in a small form +struct BlockParts { + blinded_block: Box>, + header: Box>, + body: Option>>, +} + +impl BlockParts { + pub fn new( + blinded: Box>, + header: ExecutionPayloadHeader, + ) -> Self { + Self { + blinded_block: blinded, + header: Box::new(header), + body: None, + } + } + + pub fn root(&self) -> Hash256 { + self.blinded_block.canonical_root() + } + + pub fn slot(&self) -> Slot { + self.blinded_block.message().slot() + } + + pub fn block_hash(&self) -> ExecutionBlockHash { + self.header.block_hash() + } +} + +fn reconstruct_default_header_block( + blinded_block: Box>, + header_from_block: ExecutionPayloadHeader, + spec: &ChainSpec, +) -> BlockResult { + let fork = blinded_block + .fork_name(spec) + .map_err(BeaconChainError::InconsistentFork)?; + + let payload: ExecutionPayload = match fork { + ForkName::Merge => ExecutionPayloadMerge::default().into(), + ForkName::Capella => ExecutionPayloadCapella::default().into(), + ForkName::Eip4844 => ExecutionPayloadEip4844::default().into(), + ForkName::Base | ForkName::Altair => { + return Err(Error::PayloadReconstruction(format!( + "Block with fork variant {} has execution payload", + fork + )) + .into()) + } + }; + + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == header_from_block { + blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some) + } else { + Err(BeaconChainError::InconsistentPayloadReconstructed { + slot: blinded_block.slot(), + exec_block_hash: header_from_block.block_hash(), + canonical_transactions_root: header_from_block.transactions_root(), + reconstructed_transactions_root: header_from_payload.transactions_root(), + }) + } +} + +fn reconstruct_blocks( + block_map: &mut HashMap>>, + block_parts_with_bodies: HashMap>, + log: &Logger, +) { + for (root, block_parts) in block_parts_with_bodies { + if let Some(payload_body) = block_parts.body { + match payload_body.to_payload(block_parts.header.as_ref().clone()) { + Ok(payload) => { + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == *block_parts.header { + block_map.insert( + root, + Arc::new( + block_parts + .blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some), + ), + ); + } else { + let error = BeaconChainError::InconsistentPayloadReconstructed { + slot: block_parts.blinded_block.slot(), + exec_block_hash: block_parts.header.block_hash(), + canonical_transactions_root: block_parts.header.transactions_root(), + reconstructed_transactions_root: header_from_payload + .transactions_root(), + }; + debug!(log, "Failed to reconstruct block"; "root" => ?root, "error" => ?error); + block_map.insert(root, Arc::new(Err(error))); + } + } + Err(string) => { + block_map.insert( + root, + Arc::new(Err(Error::PayloadReconstruction(string).into())), + ); + } + } + } else { + block_map.insert( + root, + Arc::new(Err(BeaconChainError::BlockHashMissingFromExecutionLayer( + block_parts.block_hash(), + ))), + ); + } + } +} + +impl BodiesByRange { + pub fn new(maybe_block_parts: Option>) -> Self { + if let Some(block_parts) = maybe_block_parts { + Self { + start: block_parts.header.block_number(), + count: 1, + state: RequestState::UnSent(vec![block_parts]), + } + } else { + Self { + start: 0, + count: 0, + state: RequestState::UnSent(vec![]), + } + } + } + + pub fn is_unsent(&self) -> bool { + matches!(self.state, RequestState::UnSent(_)) + } + + pub fn push_block_parts(&mut self, block_parts: BlockParts) -> Result<(), BlockParts> { + if self.count == BLOCKS_PER_RANGE_REQUEST { + return Err(block_parts); + } + + match &mut self.state { + RequestState::Sent(_) => Err(block_parts), + RequestState::UnSent(blocks_parts_vec) => { + let block_number = block_parts.header.block_number(); + if self.count == 0 { + self.start = block_number; + self.count = 1; + blocks_parts_vec.push(block_parts); + Ok(()) + } else { + // need to figure out if this block fits in the request + if block_number < self.start + || self.start + BLOCKS_PER_RANGE_REQUEST <= block_number + { + return Err(block_parts); + } + + blocks_parts_vec.push(block_parts); + if self.start + self.count <= block_number { + self.count = block_number - self.start + 1; + } + + Ok(()) + } + } + } + } + + async fn execute(&mut self, execution_layer: &ExecutionLayer, log: &Logger) { + if let RequestState::UnSent(blocks_parts_ref) = &mut self.state { + let block_parts_vec = std::mem::take(blocks_parts_ref); + + let mut block_map = HashMap::new(); + match execution_layer + .get_payload_bodies_by_range(self.start, self.count) + .await + { + Ok(bodies) => { + let mut range_map = (self.start..(self.start + self.count)) + .zip(bodies.into_iter().chain(std::iter::repeat(None))) + .collect::>(); + + let mut with_bodies = HashMap::new(); + for mut block_parts in block_parts_vec { + with_bodies + // it's possible the same block is requested twice, using + // or_insert_with() skips duplicates + .entry(block_parts.root()) + .or_insert_with(|| { + let block_number = block_parts.header.block_number(); + block_parts.body = + range_map.remove(&block_number).flatten().map(Box::new); + + block_parts + }); + } + + reconstruct_blocks(&mut block_map, with_bodies, log); + } + Err(e) => { + let block_result = + Arc::new(Err(Error::BlocksByRangeFailure(Box::new(e)).into())); + debug!(log, "Payload bodies by range failure"; "error" => ?block_result); + for block_parts in block_parts_vec { + block_map.insert(block_parts.root(), block_result.clone()); + } + } + } + self.state = RequestState::Sent(block_map); + } + } + + pub async fn get_block_result( + &mut self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Option>> { + self.execute(execution_layer, log).await; + if let RequestState::Sent(map) = &self.state { + return map.get(root).cloned(); + } + // Shouldn't reach this point + None + } +} + +#[derive(Clone)] +enum EngineRequest { + ByRange(Arc>>), + // When we already have the data or there's an error + NoRequest(Arc>>>>), +} + +impl EngineRequest { + pub fn new_by_range() -> Self { + Self::ByRange(Arc::new(RwLock::new(BodiesByRange::new(None)))) + } + pub fn new_no_request() -> Self { + Self::NoRequest(Arc::new(RwLock::new(HashMap::new()))) + } + + pub async fn is_unsent(&self) -> bool { + match self { + Self::ByRange(bodies_by_range) => bodies_by_range.read().await.is_unsent(), + Self::NoRequest(_) => false, + } + } + + pub async fn push_block_parts(&mut self, block_parts: BlockParts, log: &Logger) { + match self { + Self::ByRange(bodies_by_range) => { + let mut request = bodies_by_range.write().await; + + if let Err(block_parts) = request.push_block_parts(block_parts) { + drop(request); + let new_by_range = BodiesByRange::new(Some(block_parts)); + *self = Self::ByRange(Arc::new(RwLock::new(new_by_range))); + } + } + Self::NoRequest(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_parts called on NoRequest Variant", + ); + } + } + } + + pub async fn push_block_result( + &mut self, + root: Hash256, + block_result: BlockResult, + log: &Logger, + ) { + // this function will only fail if something is seriously wrong + match self { + Self::ByRange(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_result called on ByRange", + ); + } + Self::NoRequest(results) => { + results.write().await.insert(root, Arc::new(block_result)); + } + } + } + + pub async fn get_block_result( + &self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Arc> { + match self { + Self::ByRange(by_range) => { + by_range + .write() + .await + .get_block_result(root, execution_layer, log) + .await + } + Self::NoRequest(map) => map.read().await.get(root).cloned(), + } + .unwrap_or_else(|| { + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "block_result not found in request", + "root" => ?root, + ); + Arc::new(Err(Error::BlockResultNotFound.into())) + }) + } +} + +pub struct BeaconBlockStreamer { + execution_layer: ExecutionLayer, + check_early_attester_cache: CheckEarlyAttesterCache, + beacon_chain: Arc>, +} + +impl BeaconBlockStreamer { + pub fn new( + beacon_chain: &Arc>, + check_early_attester_cache: CheckEarlyAttesterCache, + ) -> Result { + let execution_layer = beacon_chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing)? + .clone(); + + Ok(Self { + execution_layer, + check_early_attester_cache, + beacon_chain: beacon_chain.clone(), + }) + } + + fn check_early_attester_cache( + &self, + root: Hash256, + ) -> Option>> { + if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes { + self.beacon_chain.early_attester_cache.get_block(root) + } else { + None + } + } + + fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { + let mut db_blocks = Vec::new(); + + for root in block_roots { + if let Some(cached_block) = self + .check_early_attester_cache(root) + .map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; + } + + match self.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + + db_blocks + } + + /// Pre-process the loaded blocks into execution engine requests. + /// + /// The purpose of this function is to separate the blocks into 2 categories: + /// 1) no_request - when we already have the full block or there's an error + /// 2) blocks_by_range - used for blinded blocks + /// + /// The function returns a vector of block roots in the same order as requested + /// along with the engine request that each root corresponds to. + async fn get_requests( + &self, + payloads: Vec<(Hash256, LoadResult)>, + ) -> Vec<(Hash256, EngineRequest)> { + let mut ordered_block_roots = Vec::new(); + let mut requests = HashMap::new(); + + // we sort the by range blocks by slot before adding them to the + // request as it should *better* optimize the number of blocks that + // can fit in the same request + let mut by_range_blocks: Vec> = vec![]; + let mut no_request = EngineRequest::new_no_request(); + + for (root, load_result) in payloads { + // preserve the order of the requested blocks + ordered_block_roots.push(root); + + let block_result = match load_result { + Err(e) => Err(e), + Ok(None) => Ok(None), + Ok(Some(LoadedBeaconBlock::Full(full_block))) => Ok(Some(full_block)), + Ok(Some(LoadedBeaconBlock::Blinded(blinded_block))) => { + match blinded_block + .message() + .execution_payload() + .map(|payload| payload.to_execution_payload_header()) + { + Ok(header) => { + if header.block_hash() == ExecutionBlockHash::zero() { + reconstruct_default_header_block( + blinded_block, + header, + &self.beacon_chain.spec, + ) + } else { + // Add the block to the set requiring a by-range request. + let block_parts = BlockParts::new(blinded_block, header); + by_range_blocks.push(block_parts); + continue; + } + } + Err(e) => Err(BeaconChainError::BeaconStateError(e)), + } + } + }; + + no_request + .push_block_result(root, block_result, &self.beacon_chain.log) + .await; + requests.insert(root, no_request.clone()); + } + + // Now deal with the by_range requests. Sort them in order of increasing slot + let mut by_range = EngineRequest::::new_by_range(); + by_range_blocks.sort_by_key(|block_parts| block_parts.slot()); + for block_parts in by_range_blocks { + let root = block_parts.root(); + by_range + .push_block_parts(block_parts, &self.beacon_chain.log) + .await; + requests.insert(root, by_range.clone()); + } + + let mut result = vec![]; + for root in ordered_block_roots { + if let Some(request) = requests.get(&root) { + result.push((root, request.clone())) + } else { + crit!( + self.beacon_chain.log, + "Please notify the devs"; + "beacon_block_streamer" => "request not found", + "root" => ?root, + ); + no_request + .push_block_result( + root, + Err(Error::RequestNotFound.into()), + &self.beacon_chain.log, + ) + .await; + result.push((root, no_request.clone())); + } + } + + result + } + + // used when the execution engine doesn't support the payload bodies methods + async fn stream_blocks_fallback( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + debug!( + self.beacon_chain.log, + "Using slower fallback method of eth_getBlockByHash()" + ); + for root in block_roots { + let cached_block = self.check_early_attester_cache(root); + let block_result = if cached_block.is_some() { + Ok(cached_block) + } else { + self.beacon_chain + .get_block(&root) + .await + .map(|opt_block| opt_block.map(Arc::new)) + }; + + if sender.send((root, Arc::new(block_result))).is_err() { + break; + } + } + } + + async fn stream_blocks( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + let n_roots = block_roots.len(); + let mut n_success = 0usize; + let mut n_sent = 0usize; + let mut engine_requests = 0usize; + + let payloads = self.load_payloads(block_roots); + let requests = self.get_requests(payloads).await; + + for (root, request) in requests { + if request.is_unsent().await { + engine_requests += 1; + } + + let result = request + .get_block_result(&root, &self.execution_layer, &self.beacon_chain.log) + .await; + + let successful = result + .as_ref() + .as_ref() + .map(|opt| opt.is_some()) + .unwrap_or(false); + + if sender.send((root, result)).is_err() { + break; + } else { + n_sent += 1; + if successful { + n_success += 1; + } + } + } + + debug!( + self.beacon_chain.log, + "BeaconBlockStreamer finished"; + "requested blocks" => n_roots, + "sent" => n_sent, + "succeeded" => n_success, + "failed" => (n_sent - n_success), + "engine requests" => engine_requests, + ); + } + + pub async fn stream( + self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + match self + .execution_layer + .get_engine_capabilities(None) + .await + .map_err(Box::new) + .map_err(BeaconChainError::EngineGetCapabilititesFailed) + { + Ok(engine_capabilities) => { + if engine_capabilities.get_payload_bodies_by_range_v1 { + self.stream_blocks(block_roots, sender).await; + } else { + // use the fallback method + self.stream_blocks_fallback(block_roots, sender).await; + } + } + Err(e) => { + send_errors(block_roots, sender, e).await; + } + } + } + + pub fn launch_stream( + self, + block_roots: Vec, + executor: &TaskExecutor, + ) -> impl Stream>)> { + let (block_tx, block_rx) = mpsc::unbounded_channel(); + debug!( + self.beacon_chain.log, + "Launching a BeaconBlockStreamer"; + "blocks" => block_roots.len(), + ); + executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); + UnboundedReceiverStream::new(block_rx) + } +} + +async fn send_errors( + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for root in block_roots { + if sender.send((root, result.clone())).is_err() { + break; + } + } +} + +impl From for BeaconChainError { + fn from(value: Error) -> Self { + BeaconChainError::BlockStreamerError(value) + } +} + +#[cfg(test)] +mod tests { + use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; + use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; + use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; + use execution_layer::EngineCapabilities; + use lazy_static::lazy_static; + use std::time::Duration; + use tokio::sync::mpsc; + use types::{ChainSpec, Epoch, EthSpec, Hash256, Keypair, MinimalEthSpec, Slot}; + + const VALIDATOR_COUNT: usize = 48; + lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); + } + + fn get_harness( + validator_count: usize, + spec: ChainSpec, + ) -> BeaconChainHarness> { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .logger(logging::test_logger()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness + } + + #[tokio::test] + async fn check_all_blocks_from_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } + + #[tokio::test] + async fn check_fallback_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + + // modify execution engine so it doesn't support engine_payloadBodiesBy* methods + let mock_execution_layer = harness.mock_execution_layer.as_ref().unwrap(); + mock_execution_layer + .server + .set_engine_capabilities(EngineCapabilities { + get_payload_bodies_by_hash_v1: false, + get_payload_bodies_by_range_v1: false, + ..DEFAULT_ENGINE_CAPABILITIES + }); + // refresh capabilities cache + harness + .chain + .execution_layer + .as_ref() + .unwrap() + .get_engine_capabilities(Some(Duration::ZERO)) + .await + .unwrap(); + + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index cd6815572..5fb654d3f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,6 +4,7 @@ use crate::attestation_verification::{ VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; +use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; @@ -105,6 +106,7 @@ use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; +use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; @@ -947,14 +949,42 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub async fn get_block_checking_early_attester_cache( - &self, - block_root: &Hash256, - ) -> Result>>, Error> { - if let Some(block) = self.early_attester_cache.get_block(*block_root) { - return Ok(Some(block)); - } - Ok(self.get_block(block_root).await?.map(Arc::new)) + pub fn get_blocks_checking_early_attester_cache( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::Yes)? + .launch_stream(block_roots, executor), + ) + } + + pub fn get_blocks( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::No)? + .launch_stream(block_roots, executor), + ) } pub async fn get_block_and_blobs_checking_early_attester_cache( diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 3b4c46249..c2a4cce4e 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,4 +1,5 @@ use crate::attester_cache::Error as AttesterCacheError; +use crate::beacon_block_streamer::Error as BlockStreamerError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; use crate::eth1_chain::Error as Eth1ChainError; @@ -143,6 +144,7 @@ pub enum BeaconChainError { ExecutionLayerMissing, BlockVariantLacksExecutionPayload(Hash256), ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box), + EngineGetCapabilititesFailed(Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, @@ -150,6 +152,7 @@ pub enum BeaconChainError { canonical_transactions_root: Hash256, reconstructed_transactions_root: Hash256, }, + BlockStreamerError(BlockStreamerError), AddPayloadLogicError, ExecutionForkChoiceUpdateFailed(execution_layer::Error), PrepareProposerFailed(BlockProcessingError), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index fc86686bf..9519eebe0 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,6 +2,7 @@ pub mod attestation_rewards; pub mod attestation_verification; mod attester_cache; pub mod beacon_block_reward; +mod beacon_block_streamer; mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 4abd6d787..009183d7a 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,7 +1,8 @@ use crate::engines::ForkchoiceState; use crate::http::{ ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ENGINE_FORKCHOICE_UPDATED_V1, - ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_GET_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, }; use crate::BlobTxConversionError; @@ -17,7 +18,8 @@ use strum::IntoStaticStr; use superstruct::superstruct; pub use types::{ Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, - ExecutionPayloadRef, FixedVector, ForkName, Hash256, Uint256, VariableList, Withdrawal, + ExecutionPayloadRef, FixedVector, ForkName, Hash256, Transactions, Uint256, VariableList, + Withdrawal, Withdrawals, }; use types::{ExecutionPayloadCapella, ExecutionPayloadEip4844, ExecutionPayloadMerge}; @@ -420,6 +422,99 @@ impl GetPayloadResponse { } } +#[derive(Clone, Debug)] +pub struct ExecutionPayloadBodyV1 { + pub transactions: Transactions, + pub withdrawals: Option>, +} + +impl ExecutionPayloadBodyV1 { + pub fn to_payload( + self, + header: ExecutionPayloadHeader, + ) -> Result, String> { + match header { + ExecutionPayloadHeader::Merge(header) => { + if self.withdrawals.is_some() { + return Err(format!( + "block {} is merge but payload body has withdrawals", + header.block_hash + )); + } + Ok(ExecutionPayload::Merge(ExecutionPayloadMerge { + parent_hash: header.parent_hash, + fee_recipient: header.fee_recipient, + state_root: header.state_root, + receipts_root: header.receipts_root, + logs_bloom: header.logs_bloom, + prev_randao: header.prev_randao, + block_number: header.block_number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: header.extra_data, + base_fee_per_gas: header.base_fee_per_gas, + block_hash: header.block_hash, + transactions: self.transactions, + })) + } + ExecutionPayloadHeader::Capella(header) => { + if let Some(withdrawals) = self.withdrawals { + Ok(ExecutionPayload::Capella(ExecutionPayloadCapella { + parent_hash: header.parent_hash, + fee_recipient: header.fee_recipient, + state_root: header.state_root, + receipts_root: header.receipts_root, + logs_bloom: header.logs_bloom, + prev_randao: header.prev_randao, + block_number: header.block_number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: header.extra_data, + base_fee_per_gas: header.base_fee_per_gas, + block_hash: header.block_hash, + transactions: self.transactions, + withdrawals, + })) + } else { + Err(format!( + "block {} is capella but payload body doesn't have withdrawals", + header.block_hash + )) + } + } + ExecutionPayloadHeader::Eip4844(header) => { + if let Some(withdrawals) = self.withdrawals { + Ok(ExecutionPayload::Eip4844(ExecutionPayloadEip4844 { + parent_hash: header.parent_hash, + fee_recipient: header.fee_recipient, + state_root: header.state_root, + receipts_root: header.receipts_root, + logs_bloom: header.logs_bloom, + prev_randao: header.prev_randao, + block_number: header.block_number, + gas_limit: header.gas_limit, + gas_used: header.gas_used, + timestamp: header.timestamp, + extra_data: header.extra_data, + base_fee_per_gas: header.base_fee_per_gas, + excess_data_gas: header.excess_data_gas, + block_hash: header.block_hash, + transactions: self.transactions, + withdrawals, + })) + } else { + Err(format!( + "block {} is post capella but payload body doesn't have withdrawals", + header.block_hash + )) + } + } + } + } +} + #[derive(Clone, Copy, Debug)] pub struct EngineCapabilities { pub new_payload_v1: bool, @@ -427,6 +522,8 @@ pub struct EngineCapabilities { pub new_payload_v3: bool, pub forkchoice_updated_v1: bool, pub forkchoice_updated_v2: bool, + pub get_payload_bodies_by_hash_v1: bool, + pub get_payload_bodies_by_range_v1: bool, pub get_payload_v1: bool, pub get_payload_v2: bool, pub get_payload_v3: bool, @@ -451,6 +548,12 @@ impl EngineCapabilities { if self.forkchoice_updated_v2 { response.push(ENGINE_FORKCHOICE_UPDATED_V2); } + if self.get_payload_bodies_by_hash_v1 { + response.push(ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1); + } + if self.get_payload_bodies_by_range_v1 { + response.push(ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1); + } if self.get_payload_v1 { response.push(ENGINE_GET_PAYLOAD_V1); } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 1939007b6..df7d5d25e 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -47,6 +47,10 @@ pub const ENGINE_FORKCHOICE_UPDATED_V1: &str = "engine_forkchoiceUpdatedV1"; pub const ENGINE_FORKCHOICE_UPDATED_V2: &str = "engine_forkchoiceUpdatedV2"; pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8); +pub const ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1: &str = "engine_getPayloadBodiesByHashV1"; +pub const ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1: &str = "engine_getPayloadBodiesByRangeV1"; +pub const ENGINE_GET_PAYLOAD_BODIES_TIMEOUT: Duration = Duration::from_secs(10); + pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = "engine_exchangeTransitionConfigurationV1"; pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_secs(1); @@ -69,6 +73,8 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ ENGINE_GET_PAYLOAD_V3, ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, + ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ]; @@ -82,6 +88,8 @@ pub static PRE_CAPELLA_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilit new_payload_v3: false, forkchoice_updated_v1: true, forkchoice_updated_v2: false, + get_payload_bodies_by_hash_v1: false, + get_payload_bodies_by_range_v1: false, get_payload_v1: true, get_payload_v2: false, get_payload_v3: false, @@ -978,6 +986,50 @@ impl HttpJsonRpc { Ok(response.into()) } + pub async fn get_payload_bodies_by_hash_v1( + &self, + block_hashes: Vec, + ) -> Result>>, Error> { + let params = json!([block_hashes]); + + let response: Vec>> = self + .rpc_request( + ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + params, + ENGINE_GET_PAYLOAD_BODIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await?; + + Ok(response + .into_iter() + .map(|opt_json| opt_json.map(From::from)) + .collect()) + } + + pub async fn get_payload_bodies_by_range_v1( + &self, + start: u64, + count: u64, + ) -> Result>>, Error> { + #[derive(Serialize)] + #[serde(transparent)] + struct Quantity(#[serde(with = "eth2_serde_utils::u64_hex_be")] u64); + + let params = json!([Quantity(start), Quantity(count)]); + let response: Vec>> = self + .rpc_request( + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, + params, + ENGINE_GET_PAYLOAD_BODIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await?; + + Ok(response + .into_iter() + .map(|opt_json| opt_json.map(From::from)) + .collect()) + } + pub async fn exchange_transition_configuration_v1( &self, transition_configuration: TransitionConfigurationV1, @@ -1021,6 +1073,10 @@ impl HttpJsonRpc { new_payload_v3: capabilities.contains(ENGINE_NEW_PAYLOAD_V3), forkchoice_updated_v1: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V1), forkchoice_updated_v2: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V2), + get_payload_bodies_by_hash_v1: capabilities + .contains(ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1), + get_payload_bodies_by_range_v1: capabilities + .contains(ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1), get_payload_v1: capabilities.contains(ENGINE_GET_PAYLOAD_V1), get_payload_v2: capabilities.contains(ENGINE_GET_PAYLOAD_V2), get_payload_v3: capabilities.contains(ENGINE_GET_PAYLOAD_V3), diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index ed9a58f26..e84e8dd81 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -5,7 +5,7 @@ use superstruct::superstruct; use types::blobs_sidecar::KzgCommitments; use types::{ Blobs, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadCapella, - ExecutionPayloadEip4844, ExecutionPayloadMerge, FixedVector, Transaction, Unsigned, + ExecutionPayloadEip4844, ExecutionPayloadMerge, FixedVector, Transactions, Unsigned, VariableList, Withdrawal, }; @@ -98,8 +98,7 @@ pub struct JsonExecutionPayload { pub excess_data_gas: Uint256, pub block_hash: ExecutionBlockHash, #[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")] - pub transactions: - VariableList, T::MaxTransactionsPerPayload>, + pub transactions: Transactions, #[superstruct(only(V2, V3))] pub withdrawals: VariableList, } @@ -572,6 +571,30 @@ impl From for JsonForkchoiceUpdatedV1Response { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(bound = "E: EthSpec")] +pub struct JsonExecutionPayloadBodyV1 { + #[serde(with = "ssz_types::serde_utils::list_of_hex_var_list")] + pub transactions: Transactions, + pub withdrawals: Option>, +} + +impl From> for ExecutionPayloadBodyV1 { + fn from(value: JsonExecutionPayloadBodyV1) -> Self { + Self { + transactions: value.transactions, + withdrawals: value.withdrawals.map(|json_withdrawals| { + Withdrawals::::from( + json_withdrawals + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + } + } +} + #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TransitionConfigurationV1 { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 7244a6b92..2160c06e4 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1644,6 +1644,37 @@ impl ExecutionLayer { } } + pub async fn get_payload_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result>>, Error> { + self.engine() + .request(|engine: &Engine| async move { + engine.api.get_payload_bodies_by_hash_v1(hashes).await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + + pub async fn get_payload_bodies_by_range( + &self, + start: u64, + count: u64, + ) -> Result>>, Error> { + let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE); + self.engine() + .request(|engine: &Engine| async move { + engine + .api + .get_payload_bodies_by_range_v1(start, count) + .await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + pub async fn get_payload_by_block_hash( &self, hash: ExecutionBlockHash, diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index 287050f66..3ed99ca60 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -45,6 +45,10 @@ lazy_static::lazy_static! { "execution_layer_get_payload_by_block_hash_time", "Time to reconstruct a payload from the EE using eth_getBlockByHash" ); + pub static ref EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE: Result = try_create_histogram( + "execution_layer_get_payload_bodies_by_range_time", + "Time to fetch a range of payload bodies from the EE" + ); pub static ref EXECUTION_LAYER_VERIFY_BLOCK_HASH: Result = try_create_histogram_with_buckets( "execution_layer_verify_block_hash_time", "Time to verify the execution block hash in Lighthouse, without the EL", diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 63893375d..fe3c6f274 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -205,6 +205,14 @@ impl ExecutionBlockGenerator { .and_then(|block| block.as_execution_block_with_tx()) } + pub fn execution_block_with_txs_by_number( + &self, + number: u64, + ) -> Option> { + self.block_by_number(number) + .and_then(|block| block.as_execution_block_with_tx()) + } + pub fn move_to_block_prior_to_terminal_block(&mut self) -> Result<(), String> { let target_block = self .terminal_block_number diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index d8aeda8bc..a1488e2dc 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -2,7 +2,7 @@ use super::Context; use crate::engine_api::{http::*, *}; use crate::json_structures::*; use crate::test_utils::DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize}; use serde_json::Value as JsonValue; use std::sync::Arc; use types::{EthSpec, ForkName}; @@ -429,6 +429,61 @@ pub async fn handle_rpc( let engine_capabilities = ctx.engine_capabilities.read(); Ok(serde_json::to_value(engine_capabilities.to_response()).unwrap()) } + ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1 => { + #[derive(Deserialize)] + #[serde(transparent)] + struct Quantity(#[serde(with = "eth2_serde_utils::u64_hex_be")] pub u64); + + let start = get_param::(params, 0) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? + .0; + let count = get_param::(params, 1) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? + .0; + + let mut response = vec![]; + for block_num in start..(start + count) { + let maybe_block = ctx + .execution_block_generator + .read() + .execution_block_with_txs_by_number(block_num); + + match maybe_block { + Some(block) => { + let transactions = Transactions::::new( + block + .transactions() + .iter() + .map(|transaction| VariableList::new(transaction.rlp().to_vec())) + .collect::>() + .map_err(|e| { + ( + format!("failed to deserialize transaction: {:?}", e), + GENERIC_ERROR_CODE, + ) + })?, + ) + .map_err(|e| { + ( + format!("failed to deserialize transactions: {:?}", e), + GENERIC_ERROR_CODE, + ) + })?; + + response.push(Some(JsonExecutionPayloadBodyV1:: { + transactions, + withdrawals: block + .withdrawals() + .ok() + .map(|withdrawals| VariableList::from(withdrawals.clone())), + })); + } + None => response.push(None), + } + } + + Ok(serde_json::to_value(response).unwrap()) + } other => Err(( format!("The method {} does not exist/is not available", other), METHOD_NOT_FOUND_CODE, diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 52a794ba1..3c0763a8f 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -40,6 +40,8 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { new_payload_v3: true, forkchoice_updated_v1: true, forkchoice_updated_v2: true, + get_payload_bodies_by_hash_v1: true, + get_payload_bodies_by_range_v1: true, get_payload_v1: true, get_payload_v2: true, get_payload_v3: true, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index cefb1c77e..d91e42e7e 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::beacon_processor::{worker::FUTURE_SLOT_TOLERANCE, SendOnDrop}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; @@ -12,10 +14,9 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; -use std::sync::Arc; use task_executor::TaskExecutor; -use types::light_client_bootstrap::LightClientBootstrap; -use types::{Epoch, EthSpec, Hash256, Slot}; +use tokio_stream::StreamExt; +use types::{light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -138,21 +139,25 @@ impl Worker { request_id: PeerRequestId, request: BlocksByRootRequest, ) { + let requested_blocks = request.block_roots.len(); + let mut block_stream = match self + .chain + .get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor) + { + Ok(block_stream) => block_stream, + Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + }; // Fetching blocks is async because it may have to hit the execution layer for payloads. executor.spawn( async move { let mut send_block_count = 0; let mut send_response = true; - for root in request.block_roots.iter() { - match self - .chain - .get_block_checking_early_attester_cache(root) - .await - { + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { Ok(Some(block)) => { self.send_response( peer_id, - Response::BlocksByRoot(Some(block)), + Response::BlocksByRoot(Some(block.clone())), request_id, ); send_block_count += 1; @@ -197,8 +202,8 @@ impl Worker { self.log, "Received BlocksByRoot Request"; "peer" => %peer_id, - "requested" => request.block_roots.len(), - "returned" => send_block_count + "requested" => requested_blocks, + "returned" => %send_block_count ); // send stream termination @@ -519,14 +524,19 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); + let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + Ok(block_stream) => block_stream, + Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + }; + // Fetching blocks is async because it may have to hit the execution layer for payloads. executor.spawn( async move { let mut blocks_sent = 0; let mut send_response = true; - for root in block_roots { - match self.chain.get_block(&root).await { + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending @@ -536,7 +546,7 @@ impl Worker { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlocksByRange(Some(Arc::new(block))), + response: Response::BlocksByRange(Some(block.clone())), id: request_id, }); } diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 6e055d0a7..7762afe91 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -171,3 +171,13 @@ impl ForkVersionDeserialize for ExecutionPayload { }) } } + +impl ExecutionPayload { + pub fn fork_name(&self) -> ForkName { + match self { + ExecutionPayload::Merge(_) => ForkName::Merge, + ExecutionPayload::Capella(_) => ForkName::Capella, + ExecutionPayload::Eip4844(_) => ForkName::Eip4844, + } + } +} diff --git a/testing/execution_engine_integration/src/nethermind.rs b/testing/execution_engine_integration/src/nethermind.rs index 720a4a73b..485485c6f 100644 --- a/testing/execution_engine_integration/src/nethermind.rs +++ b/testing/execution_engine_integration/src/nethermind.rs @@ -11,7 +11,7 @@ use unused_port::unused_tcp4_port; /// We've pinned the Nethermind version since our method of using the `master` branch to /// find the latest tag isn't working. It appears Nethermind don't always tag on `master`. /// We should fix this so we always pull the latest version of Nethermind. -const NETHERMIND_BRANCH: &str = "release/1.14.6"; +const NETHERMIND_BRANCH: &str = "release/1.17.1"; const NETHERMIND_REPO_URL: &str = "https://github.com/NethermindEth/nethermind"; fn build_result(repo_dir: &Path) -> Output { @@ -67,7 +67,7 @@ impl NethermindEngine { .join("Nethermind.Runner") .join("bin") .join("Release") - .join("net6.0") + .join("net7.0") .join("Nethermind.Runner") } } @@ -95,7 +95,7 @@ impl GenericExecutionEngine for NethermindEngine { .arg("--datadir") .arg(datadir.path().to_str().unwrap()) .arg("--config") - .arg("kiln") + .arg("hive") .arg("--Init.ChainSpecPath") .arg(genesis_json_path.to_str().unwrap()) .arg("--Merge.TerminalTotalDifficulty") diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 15e9f2601..ff333332b 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -15,8 +15,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::time::sleep; use types::{ - Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ForkName, FullPayload, - Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, + Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, + ForkName, FullPayload, Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, }; const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(30); @@ -628,12 +628,32 @@ async fn check_payload_reconstruction( ) { let reconstructed = ee .execution_layer - // FIXME: handle other forks here? - .get_payload_by_block_hash(payload.block_hash(), ForkName::Merge) + .get_payload_by_block_hash(payload.block_hash(), payload.fork_name()) .await .unwrap() .unwrap(); assert_eq!(reconstructed, *payload); + // also check via payload bodies method + let capabilities = ee + .execution_layer + .get_engine_capabilities(None) + .await + .unwrap(); + assert!( + // if the engine doesn't have these capabilities, we need to update the client in our tests + capabilities.get_payload_bodies_by_hash_v1 && capabilities.get_payload_bodies_by_range_v1, + "Testing engine does not support payload bodies methods" + ); + let mut bodies = ee + .execution_layer + .get_payload_bodies_by_hash(vec![payload.block_hash()]) + .await + .unwrap(); + assert_eq!(bodies.len(), 1); + let body = bodies.pop().unwrap().unwrap(); + let header = ExecutionPayloadHeader::from(payload.to_ref()); + let reconstructed_from_body = body.to_payload(header).unwrap(); + assert_eq!(reconstructed_from_body, *payload); } /// Returns the duration since the unix epoch.