diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index ce6e30ebf..e6654a308 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -298,8 +298,8 @@ impl Decoder for SSZSnappyOutboundCodec { .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData(format!( - "RPC response length is out of bounds, length {}", - length + "RPC response length is out of bounds, length {}, max {}, min {}", + length, ssz_limits.max, ssz_limits.min ))); } // Calculate worst case compression length for given uncompressed length diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 0773197e8..2a91a4bb4 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -23,7 +23,7 @@ use tokio_util::{ use types::BlobsSidecar; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Blob, EmptyBlock, EthSpec, - ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock, + ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock }; lazy_static! { @@ -107,6 +107,12 @@ lazy_static! { .as_ssz_bytes() .len(); + pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty().as_ssz_bytes().len(); + pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::::max_size(); + + //FIXME(sean) these are underestimates + pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN; + pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX; } /// The maximum bytes that can be sent across the RPC pre-merge. @@ -359,9 +365,14 @@ impl ProtocolId { Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - //FIXME(sean) add blob sizes - Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), + Protocol::BlobsByRange => RpcLimits::new( + *BLOBS_SIDECAR_MIN, + *BLOBS_SIDECAR_MAX, + ), + Protocol::BlobsByRoot => RpcLimits::new( + *SIGNED_BLOCK_AND_BLOBS_MIN, + *SIGNED_BLOCK_AND_BLOBS_MAX, + ), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 1938f389b..0824ca171 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -435,7 +435,7 @@ impl Worker { // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = req .start_slot - .checked_sub(0) + .checked_sub(1) .map(|prev_slot| { self.chain .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) @@ -614,7 +614,7 @@ impl Worker { // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = req .start_slot - .checked_sub(0) + .checked_sub(1) .map(|prev_slot| { self.chain .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5917c7ecc..45cbb1935 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -314,14 +314,18 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_block.is_none(); info.add_block_response(maybe_block); - let maybe_block = info.pop_response().map(|block_sidecar_pair| { + let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| { BlockWrapper::BlockAndBlob { block_sidecar_pair } }); if info.is_finished() { entry.remove(); } - Some((chain_id, batch_id, maybe_block)) + if !stream_terminator && maybe_block_wrapped.is_none() { + return None + } + Some((chain_id, batch_id, maybe_block_wrapped)) } Entry::Vacant(_) => None, } @@ -356,6 +360,7 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_sidecar.is_none(); info.add_sidecar_response(maybe_sidecar); let maybe_block = info .pop_response() @@ -363,6 +368,9 @@ impl SyncNetworkContext { if info.is_finished() { entry.remove(); } + if !stream_terminator && maybe_block.is_none() { + return None + } Some((chain_id, batch_id, maybe_block)) } Entry::Vacant(_) => None, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 732795fce..439c99c01 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -503,9 +503,7 @@ impl, Cold: ItemStore> HotColdDB } pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - if let Some(blobs) = self.blob_cache.lock().get(block_root) { - Ok(Some(blobs.clone())) - } else if let Some(bytes) = self + if let Some(bytes) = self .hot_db .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {