diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 6bb64f83f..789242e8d 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -16,8 +16,10 @@ use std::sync::Arc; use std::time::Duration; use types::{ForkContext, ForkName}; -/// The maximum transmit size of gossip messages in bytes. -pub const GOSSIP_MAX_SIZE: usize = 10 * 1_048_576; // 10M +/// The maximum transmit size of gossip messages in bytes pre-merge. +const GOSSIP_MAX_SIZE: usize = 1_048_576; // 1M +/// The maximum transmit size of gossip messages in bytes post-merge. +const GOSSIP_MAX_SIZE_POST_MERGE: usize = 10 * 1_048_576; // 10M /// This is a constant to be used in discovery. The lower bound of the gossipsub mesh. pub const MESH_N_LOW: usize = 6; @@ -40,6 +42,15 @@ pub const DUPLICATE_CACHE_TIME: Duration = Duration::from_secs(33 * 12 + 1); // const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0]; +/// The maximum size of gossip messages. +pub fn gossip_max_size(is_merge_enabled: bool) -> usize { + if is_merge_enabled { + GOSSIP_MAX_SIZE_POST_MERGE + } else { + GOSSIP_MAX_SIZE + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. @@ -231,6 +242,7 @@ pub fn gossipsub_config(fork_context: Arc) -> GossipsubConfig { } } + let is_merge_enabled = fork_context.fork_exists(ForkName::Merge); let gossip_message_id = move |message: &GossipsubMessage| { MessageId::from( &Sha256::digest( @@ -239,7 +251,7 @@ pub fn gossipsub_config(fork_context: Arc) -> GossipsubConfig { ) }; GossipsubConfigBuilder::default() - .max_transmit_size(GOSSIP_MAX_SIZE) + .max_transmit_size(gossip_max_size(is_merge_enabled)) .heartbeat_interval(Duration::from_millis(700)) .mesh_n(8) .mesh_n_low(MESH_N_LOW) diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index b37b69dcf..058b38ceb 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -16,7 +16,7 @@ pub mod rpc; mod service; pub mod types; -pub use config::GOSSIP_MAX_SIZE; +pub use config::gossip_max_size; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::str::FromStr; diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index f5d7232a6..0924dca0c 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -697,9 +697,9 @@ mod tests { version: Version, message: &mut BytesMut, ) -> Result>, RPCError> { - let max_packet_size = 1_048_576; let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context()); + let max_packet_size = max_rpc_size(&fork_context); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); // decode message just as snappy message @@ -1124,7 +1124,7 @@ mod tests { ); } - /// Test sending a message with encoded length prefix > MAX_RPC_SIZE. + /// Test sending a message with encoded length prefix > max_rpc_size. #[test] fn test_decode_invalid_length() { // 10 byte snappy stream identifier diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 1a12c2600..37724e028 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -5,7 +5,7 @@ use super::methods::{ GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination, }; use super::outbound::OutboundRequestContainer; -use super::protocol::{InboundRequest, Protocol, RPCError, RPCProtocol}; +use super::protocol::{max_rpc_size, InboundRequest, Protocol, RPCError, RPCProtocol}; use super::{RPCReceived, RPCSend}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; use crate::rpc::protocol::InboundFramed; @@ -951,6 +951,7 @@ where OutboundRequestContainer { req: req.clone(), fork_context: self.fork_context.clone(), + max_rpc_size: max_rpc_size(&self.fork_context), }, (), ) diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index c7bfd405d..ebd624061 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -30,7 +30,7 @@ pub use methods::{ RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub(crate) use outbound::OutboundRequest; -pub use protocol::{Protocol, RPCError, MAX_RPC_SIZE}; +pub use protocol::{max_rpc_size, Protocol, RPCError}; pub(crate) mod codec; mod handler; @@ -186,6 +186,7 @@ where SubstreamProtocol::new( RPCProtocol { fork_context: self.fork_context.clone(), + max_rpc_size: max_rpc_size(&self.fork_context), phantom: PhantomData, }, (), diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 1c908887e..17201c6cf 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use super::methods::*; use super::protocol::Protocol; -use super::protocol::{ProtocolId, MAX_RPC_SIZE}; +use super::protocol::ProtocolId; use super::RPCError; use crate::rpc::protocol::Encoding; use crate::rpc::protocol::Version; @@ -29,6 +29,7 @@ use types::{EthSpec, ForkContext}; pub struct OutboundRequestContainer { pub req: OutboundRequest, pub fork_context: Arc, + pub max_rpc_size: usize, } #[derive(Debug, Clone, PartialEq)] @@ -150,7 +151,7 @@ where Encoding::SSZSnappy => { let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new( protocol, - MAX_RPC_SIZE, + self.max_rpc_size, self.fork_context.clone(), )); OutboundCodec::SSZSnappy(ssz_snappy_codec) diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index a6a015878..1e6504199 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -22,7 +22,7 @@ use tokio_util::{ }; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, EthSpec, ForkContext, - Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, + ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, }; lazy_static! { @@ -92,8 +92,10 @@ lazy_static! { } -/// The maximum bytes that can be sent across the RPC. -pub const MAX_RPC_SIZE: usize = 10 * 1_048_576; // 10M +/// The maximum bytes that can be sent across the RPC pre-merge. +pub(crate) const MAX_RPC_SIZE: usize = 1_048_576; // 1M +/// The maximum bytes that can be sent across the RPC post-merge. +pub(crate) const MAX_RPC_SIZE_POST_MERGE: usize = 10 * 1_048_576; // 10M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). @@ -102,6 +104,15 @@ const TTFB_TIMEOUT: u64 = 5; /// established before the stream is terminated. const REQUEST_TIMEOUT: u64 = 15; +/// Returns the maximum bytes that can be sent across the RPC. +pub fn max_rpc_size(fork_context: &ForkContext) -> usize { + if fork_context.fork_exists(ForkName::Merge) { + MAX_RPC_SIZE_POST_MERGE + } else { + MAX_RPC_SIZE + } +} + /// Protocol names to be used. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Protocol { @@ -170,6 +181,7 @@ impl std::fmt::Display for Version { #[derive(Debug, Clone)] pub struct RPCProtocol { pub fork_context: Arc, + pub max_rpc_size: usize, pub phantom: PhantomData, } @@ -206,7 +218,7 @@ impl RpcLimits { Self { min, max } } - /// Returns true if the given length is greater than `MAX_RPC_SIZE` or out of + /// Returns true if the given length is greater than `max_rpc_size` or out of /// bounds for the given ssz type, returns false otherwise. pub fn is_out_of_bounds(&self, length: usize, max_rpc_size: usize) -> bool { length > std::cmp::min(self.max, max_rpc_size) || length < self.min @@ -365,7 +377,7 @@ where Encoding::SSZSnappy => { let ssz_snappy_codec = BaseInboundCodec::new(SSZSnappyInboundCodec::new( protocol, - MAX_RPC_SIZE, + self.max_rpc_size, self.fork_context.clone(), )); InboundCodec::SSZSnappy(ssz_snappy_codec) diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index 6daaeb335..865946a22 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -17,7 +17,7 @@ type E = MinimalEthSpec; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context -fn fork_context() -> ForkContext { +pub fn fork_context() -> ForkContext { let mut chain_spec = E::default_spec(); // Set fork_epoch to `Some` to ensure that the `ForkContext` object // includes altair in the list of forks diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 77d014e6a..b270765f8 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -1,7 +1,7 @@ #![cfg(test)] use lighthouse_network::rpc::methods::*; use lighthouse_network::{ - rpc::MAX_RPC_SIZE, BehaviourEvent, Libp2pEvent, ReportSource, Request, Response, + rpc::max_rpc_size, BehaviourEvent, Libp2pEvent, ReportSource, Request, Response, }; use slog::{debug, warn, Level}; use ssz::Encode; @@ -11,16 +11,16 @@ use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, Hash256, - MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, ForkContext, + Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, }; mod common; type E = MinimalEthSpec; -/// Merge block with length < MAX_RPC_SIZE. -fn merge_block_small() -> BeaconBlock { +/// Merge block with length < max_rpc_size. +fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock { let mut block = BeaconBlockMerge::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); @@ -28,14 +28,14 @@ fn merge_block_small() -> BeaconBlock { block.body.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() <= MAX_RPC_SIZE); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); block } /// Merge block with length > MAX_RPC_SIZE. /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. -fn merge_block_large() -> BeaconBlock { +fn merge_block_large(fork_context: &ForkContext) -> BeaconBlock { let mut block = BeaconBlockMerge::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); @@ -43,7 +43,7 @@ fn merge_block_large() -> BeaconBlock { block.body.execution_payload.transactions = txs; let block = BeaconBlock::Merge(block); - assert!(block.ssz_bytes_len() > MAX_RPC_SIZE); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); block } @@ -180,7 +180,7 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(); + let full_block = merge_block_small(&common::fork_context()); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -309,7 +309,7 @@ fn test_blocks_by_range_over_limit() { }); // BlocksByRange Response - let full_block = merge_block_large(); + let full_block = merge_block_large(&common::fork_context()); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -666,7 +666,7 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(); + let full_block = merge_block_small(&common::fork_context()); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block))); diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 37963cd58..ab51c218b 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -468,7 +468,8 @@ pub fn get_config( }; } - client_config.chain.max_network_size = lighthouse_network::GOSSIP_MAX_SIZE; + client_config.chain.max_network_size = + lighthouse_network::gossip_max_size(spec.merge_fork_epoch.is_some()); if cli_args.is_present("slasher") { let slasher_dir = if let Some(slasher_dir) = cli_args.value_of("slasher-dir") { diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index e1f7a045b..0080b092c 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -1,5 +1,4 @@ use crate::{test_utils::TestRandom, *}; -use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; @@ -44,19 +43,6 @@ impl ExecutionPayload { Self::default() } - /// Returns the ssz size of `self`. - pub fn payload_size(&self) -> Result { - let mut tx_size = ssz::BYTES_PER_LENGTH_OFFSET.safe_mul(self.transactions.len())?; - for tx in self.transactions.iter() { - tx_size.safe_add_assign(tx.len())?; - } - Self::empty() - .as_ssz_bytes() - .len() - .safe_add(::ssz_fixed_len().safe_mul(self.extra_data.len())?)? - .safe_add(tx_size) - } - #[allow(clippy::integer_arithmetic)] /// Returns the maximum size of an execution payload. pub fn max_execution_payload_size() -> usize { @@ -68,26 +54,3 @@ impl ExecutionPayload { + (T::max_transactions_per_payload() * (ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_payload_size() { - let mut payload = ExecutionPayload::::empty(); - - assert_eq!( - payload.as_ssz_bytes().len(), - payload.payload_size().unwrap() - ); - - payload.extra_data = VariableList::from(vec![42; 16]); - payload.transactions = VariableList::from(vec![VariableList::from(vec![42; 42])]); - - assert_eq!( - payload.as_ssz_bytes().len(), - payload.payload_size().unwrap() - ); - } -} diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 75447d35a..693b3de82 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -367,7 +367,7 @@ fn run( let logfile_compress = matches.is_present("logfile-compress"); // Construct the path to the log file. - let mut log_path: Option = parse_optional(matches, "logfile")?; + let mut log_path: Option = clap_utils::parse_optional(matches, "logfile")?; if log_path.is_none() { log_path = match matches.subcommand_name() { Some("beacon_node") => Some(