diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index ce6e30ebf..fb07e6831 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -298,8 +298,8 @@ impl Decoder for SSZSnappyOutboundCodec { .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData(format!( - "RPC response length is out of bounds, length {}", - length + "RPC response length is out of bounds, length {}, max {}, min {}", + length, ssz_limits.max, ssz_limits.min ))); } // Calculate worst case compression length for given uncompressed length @@ -439,6 +439,9 @@ fn context_bytes( SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()), }; } + if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant { + return fork_context.to_context_bytes(ForkName::Eip4844); + } } } None diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 0773197e8..8a3149fc5 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -107,6 +107,12 @@ lazy_static! { .as_ssz_bytes() .len(); + pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::::empty().as_ssz_bytes().len(); + pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::::max_size(); + + //FIXME(sean) these are underestimates + pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN; + pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX; } /// The maximum bytes that can be sent across the RPC pre-merge. @@ -358,11 +364,10 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - - //FIXME(sean) add blob sizes - Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()), - Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), - + Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX), + Protocol::BlobsByRoot => { + RpcLimits::new(*SIGNED_BLOCK_AND_BLOBS_MIN, *SIGNED_BLOCK_AND_BLOBS_MAX) + } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -381,13 +386,16 @@ impl ProtocolId { /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { - if self.version == Version::V2 { - match self.message_name { + match self.version { + Version::V2 => match self.message_name { Protocol::BlocksByRange | Protocol::BlocksByRoot => return true, _ => return false, - } + }, + Version::V1 => match self.message_name { + Protocol::BlobsByRange | Protocol::BlobsByRoot => return true, + _ => return false, + }, } - false } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 6eae7eed5..d85bb1f20 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -433,7 +433,17 @@ impl Worker { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; + let mut last_block_root = req + .start_slot + .checked_sub(1) + .map(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + }) + .transpose() + .ok() + .flatten() + .flatten(); let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) // map skip slots to None @@ -602,7 +612,17 @@ impl Worker { }; // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; + let mut last_block_root = req + .start_slot + .checked_sub(1) + .map(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + }) + .transpose() + .ok() + .flatten() + .flatten(); let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) // map skip slots to None @@ -669,7 +689,7 @@ impl Worker { self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", + "msg" => "Failed to return all requested blobs", "start_slot" => req.start_slot, "current_slot" => current_slot, "requested" => req.count, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5917c7ecc..978bd69d0 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -314,14 +314,24 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_block.is_none(); info.add_block_response(maybe_block); - let maybe_block = info.pop_response().map(|block_sidecar_pair| { + let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| { BlockWrapper::BlockAndBlob { block_sidecar_pair } }); + + if stream_terminator && !info.is_finished() { + return None; + } + if !stream_terminator && maybe_block_wrapped.is_none() { + return None; + } + if info.is_finished() { entry.remove(); } - Some((chain_id, batch_id, maybe_block)) + + Some((chain_id, batch_id, maybe_block_wrapped)) } Entry::Vacant(_) => None, } @@ -356,13 +366,24 @@ impl SyncNetworkContext { let (chain_id, batch_id, info) = entry.get_mut(); let chain_id = chain_id.clone(); let batch_id = batch_id.clone(); + let stream_terminator = maybe_sidecar.is_none(); info.add_sidecar_response(maybe_sidecar); let maybe_block = info .pop_response() .map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair }); + + if stream_terminator && !info.is_finished() { + return None; + } + + if !stream_terminator && maybe_block.is_none() { + return None; + } + if info.is_finished() { entry.remove(); } + Some((chain_id, batch_id, maybe_block)) } Entry::Vacant(_) => None, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 199be788e..46b6d05d7 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -137,10 +137,16 @@ impl SyncingChain { let id = SyncingChain::::id(&target_head_root, &target_head_slot); + let target_slot = if is_finalized_segment { + target_head_slot + (2 * T::EthSpec::slots_per_epoch()) + 1 + } else { + target_head_slot + }; + SyncingChain { id, start_epoch, - target_head_slot, + target_head_slot: target_slot, target_head_root, batches: BTreeMap::new(), peers, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 732795fce..00aa0b2af 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -503,9 +503,9 @@ impl, Cold: ItemStore> HotColdDB } pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { - if let Some(blobs) = self.blob_cache.lock().get(block_root) { - Ok(Some(blobs.clone())) - } else if let Some(bytes) = self + // FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks, + // may want to attempt to use one again + if let Some(bytes) = self .hot_db .get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())? {