miscelaneous fixes on syncing, rpc and responding to peer's sync related requests (#3827)
- there was a bug in responding range blob requests where we would incorrectly label the first slot of an epoch as a non-skipped slot if it were skipped. this bug did not exist in the code for responding to block range request because the logic error was mitigated by defensive coding elsewhere - there was a bug where a block received during range sync without a corresponding blob (and vice versa) was incorrectly interpreted as a stream termination - RPC size limit fixes. - Our blob cache was dead locking so I removed use of it for now. - Because of our change in finalized sync batch size from 2 to 1 and our transition to using exact epoch boundaries for batches (rather than one slot past the epoch boundary), we need to sync finalized sync to 2 epochs + 1 slot past our peer's finalized slot in order to finalize the chain locally. - use fork context bytes in rpc methods on both the server and client side
This commit is contained in:
parent
cc420caaa5
commit
33d01a7911
@ -298,8 +298,8 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
|
|||||||
.rpc_response_limits::<TSpec>(&self.fork_context);
|
.rpc_response_limits::<TSpec>(&self.fork_context);
|
||||||
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
||||||
return Err(RPCError::InvalidData(format!(
|
return Err(RPCError::InvalidData(format!(
|
||||||
"RPC response length is out of bounds, length {}",
|
"RPC response length is out of bounds, length {}, max {}, min {}",
|
||||||
length
|
length, ssz_limits.max, ssz_limits.min
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
// Calculate worst case compression length for given uncompressed length
|
// Calculate worst case compression length for given uncompressed length
|
||||||
@ -439,6 +439,9 @@ fn context_bytes<T: EthSpec>(
|
|||||||
SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()),
|
SignedBeaconBlock::Base { .. } => Some(fork_context.genesis_context_bytes()),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
if let RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) = rpc_variant {
|
||||||
|
return fork_context.to_context_bytes(ForkName::Eip4844);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
|
@ -107,6 +107,12 @@ lazy_static! {
|
|||||||
.as_ssz_bytes()
|
.as_ssz_bytes()
|
||||||
.len();
|
.len();
|
||||||
|
|
||||||
|
pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::<MainnetEthSpec>::empty().as_ssz_bytes().len();
|
||||||
|
pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::<MainnetEthSpec>::max_size();
|
||||||
|
|
||||||
|
//FIXME(sean) these are underestimates
|
||||||
|
pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN;
|
||||||
|
pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The maximum bytes that can be sent across the RPC pre-merge.
|
/// The maximum bytes that can be sent across the RPC pre-merge.
|
||||||
@ -358,11 +364,10 @@ impl ProtocolId {
|
|||||||
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
|
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
|
||||||
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||||
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||||
|
Protocol::BlobsByRange => RpcLimits::new(*BLOBS_SIDECAR_MIN, *BLOBS_SIDECAR_MAX),
|
||||||
//FIXME(sean) add blob sizes
|
Protocol::BlobsByRoot => {
|
||||||
Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
RpcLimits::new(*SIGNED_BLOCK_AND_BLOBS_MIN, *SIGNED_BLOCK_AND_BLOBS_MAX)
|
||||||
Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
}
|
||||||
|
|
||||||
Protocol::Ping => RpcLimits::new(
|
Protocol::Ping => RpcLimits::new(
|
||||||
<Ping as Encode>::ssz_fixed_len(),
|
<Ping as Encode>::ssz_fixed_len(),
|
||||||
<Ping as Encode>::ssz_fixed_len(),
|
<Ping as Encode>::ssz_fixed_len(),
|
||||||
@ -381,14 +386,17 @@ impl ProtocolId {
|
|||||||
/// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the
|
/// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the
|
||||||
/// beginning of the stream, else returns `false`.
|
/// beginning of the stream, else returns `false`.
|
||||||
pub fn has_context_bytes(&self) -> bool {
|
pub fn has_context_bytes(&self) -> bool {
|
||||||
if self.version == Version::V2 {
|
match self.version {
|
||||||
match self.message_name {
|
Version::V2 => match self.message_name {
|
||||||
Protocol::BlocksByRange | Protocol::BlocksByRoot => return true,
|
Protocol::BlocksByRange | Protocol::BlocksByRoot => return true,
|
||||||
_ => return false,
|
_ => return false,
|
||||||
|
},
|
||||||
|
Version::V1 => match self.message_name {
|
||||||
|
Protocol::BlobsByRange | Protocol::BlobsByRoot => return true,
|
||||||
|
_ => return false,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An RPC protocol ID.
|
/// An RPC protocol ID.
|
||||||
|
@ -433,7 +433,17 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Pick out the required blocks, ignoring skip-slots.
|
// Pick out the required blocks, ignoring skip-slots.
|
||||||
let mut last_block_root = None;
|
let mut last_block_root = req
|
||||||
|
.start_slot
|
||||||
|
.checked_sub(1)
|
||||||
|
.map(|prev_slot| {
|
||||||
|
self.chain
|
||||||
|
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.flatten();
|
||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
||||||
// map skip slots to None
|
// map skip slots to None
|
||||||
@ -602,7 +612,17 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Pick out the required blocks, ignoring skip-slots.
|
// Pick out the required blocks, ignoring skip-slots.
|
||||||
let mut last_block_root = None;
|
let mut last_block_root = req
|
||||||
|
.start_slot
|
||||||
|
.checked_sub(1)
|
||||||
|
.map(|prev_slot| {
|
||||||
|
self.chain
|
||||||
|
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.flatten();
|
||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
|
||||||
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count))
|
||||||
// map skip slots to None
|
// map skip slots to None
|
||||||
@ -669,7 +689,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
self.log,
|
self.log,
|
||||||
"BlobsByRange Response processed";
|
"BlobsByRange Response processed";
|
||||||
"peer" => %peer_id,
|
"peer" => %peer_id,
|
||||||
"msg" => "Failed to return all requested blocks",
|
"msg" => "Failed to return all requested blobs",
|
||||||
"start_slot" => req.start_slot,
|
"start_slot" => req.start_slot,
|
||||||
"current_slot" => current_slot,
|
"current_slot" => current_slot,
|
||||||
"requested" => req.count,
|
"requested" => req.count,
|
||||||
|
@ -314,14 +314,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
let (chain_id, batch_id, info) = entry.get_mut();
|
||||||
let chain_id = chain_id.clone();
|
let chain_id = chain_id.clone();
|
||||||
let batch_id = batch_id.clone();
|
let batch_id = batch_id.clone();
|
||||||
|
let stream_terminator = maybe_block.is_none();
|
||||||
info.add_block_response(maybe_block);
|
info.add_block_response(maybe_block);
|
||||||
let maybe_block = info.pop_response().map(|block_sidecar_pair| {
|
let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| {
|
||||||
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if stream_terminator && !info.is_finished() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
if !stream_terminator && maybe_block_wrapped.is_none() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
entry.remove();
|
entry.remove();
|
||||||
}
|
}
|
||||||
Some((chain_id, batch_id, maybe_block))
|
|
||||||
|
Some((chain_id, batch_id, maybe_block_wrapped))
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
@ -356,13 +366,24 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
let (chain_id, batch_id, info) = entry.get_mut();
|
||||||
let chain_id = chain_id.clone();
|
let chain_id = chain_id.clone();
|
||||||
let batch_id = batch_id.clone();
|
let batch_id = batch_id.clone();
|
||||||
|
let stream_terminator = maybe_sidecar.is_none();
|
||||||
info.add_sidecar_response(maybe_sidecar);
|
info.add_sidecar_response(maybe_sidecar);
|
||||||
let maybe_block = info
|
let maybe_block = info
|
||||||
.pop_response()
|
.pop_response()
|
||||||
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
|
.map(|block_sidecar_pair| BlockWrapper::BlockAndBlob { block_sidecar_pair });
|
||||||
|
|
||||||
|
if stream_terminator && !info.is_finished() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !stream_terminator && maybe_block.is_none() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
entry.remove();
|
entry.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((chain_id, batch_id, maybe_block))
|
Some((chain_id, batch_id, maybe_block))
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
|
@ -137,10 +137,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
|||||||
|
|
||||||
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
|
||||||
|
|
||||||
|
let target_slot = if is_finalized_segment {
|
||||||
|
target_head_slot + (2 * T::EthSpec::slots_per_epoch()) + 1
|
||||||
|
} else {
|
||||||
|
target_head_slot
|
||||||
|
};
|
||||||
|
|
||||||
SyncingChain {
|
SyncingChain {
|
||||||
id,
|
id,
|
||||||
start_epoch,
|
start_epoch,
|
||||||
target_head_slot,
|
target_head_slot: target_slot,
|
||||||
target_head_root,
|
target_head_root,
|
||||||
batches: BTreeMap::new(),
|
batches: BTreeMap::new(),
|
||||||
peers,
|
peers,
|
||||||
|
@ -503,9 +503,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||||
if let Some(blobs) = self.blob_cache.lock().get(block_root) {
|
// FIXME(sean) I was attempting to use a blob cache here but was getting deadlocks,
|
||||||
Ok(Some(blobs.clone()))
|
// may want to attempt to use one again
|
||||||
} else if let Some(bytes) = self
|
if let Some(bytes) = self
|
||||||
.hot_db
|
.hot_db
|
||||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user