fix rate limits, and a couple other bugs
This commit is contained in:
parent
7d5db8015d
commit
9c46a1cb21
@ -298,8 +298,8 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
|
|||||||
.rpc_response_limits::<TSpec>(&self.fork_context);
|
.rpc_response_limits::<TSpec>(&self.fork_context);
|
||||||
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
|
||||||
return Err(RPCError::InvalidData(format!(
|
return Err(RPCError::InvalidData(format!(
|
||||||
"RPC response length is out of bounds, length {}",
|
"RPC response length is out of bounds, length {}, max {}, min {}",
|
||||||
length
|
length, ssz_limits.max, ssz_limits.min
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
// Calculate worst case compression length for given uncompressed length
|
// Calculate worst case compression length for given uncompressed length
|
||||||
|
@ -23,7 +23,7 @@ use tokio_util::{
|
|||||||
use types::BlobsSidecar;
|
use types::BlobsSidecar;
|
||||||
use types::{
|
use types::{
|
||||||
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Blob, EmptyBlock, EthSpec,
|
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Blob, EmptyBlock, EthSpec,
|
||||||
ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock,
|
ForkContext, ForkName, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock
|
||||||
};
|
};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
@ -107,6 +107,12 @@ lazy_static! {
|
|||||||
.as_ssz_bytes()
|
.as_ssz_bytes()
|
||||||
.len();
|
.len();
|
||||||
|
|
||||||
|
pub static ref BLOBS_SIDECAR_MIN: usize = BlobsSidecar::<MainnetEthSpec>::empty().as_ssz_bytes().len();
|
||||||
|
pub static ref BLOBS_SIDECAR_MAX: usize = BlobsSidecar::<MainnetEthSpec>::max_size();
|
||||||
|
|
||||||
|
//FIXME(sean) these are underestimates
|
||||||
|
pub static ref SIGNED_BLOCK_AND_BLOBS_MIN: usize = *BLOBS_SIDECAR_MIN + *SIGNED_BEACON_BLOCK_BASE_MIN;
|
||||||
|
pub static ref SIGNED_BLOCK_AND_BLOBS_MAX: usize =*BLOBS_SIDECAR_MAX + *SIGNED_BEACON_BLOCK_EIP4844_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The maximum bytes that can be sent across the RPC pre-merge.
|
/// The maximum bytes that can be sent across the RPC pre-merge.
|
||||||
@ -359,9 +365,14 @@ impl ProtocolId {
|
|||||||
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||||
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
||||||
|
|
||||||
//FIXME(sean) add blob sizes
|
Protocol::BlobsByRange => RpcLimits::new(
|
||||||
Protocol::BlobsByRange => rpc_block_limits_by_fork(fork_context.current_fork()),
|
*BLOBS_SIDECAR_MIN,
|
||||||
Protocol::BlobsByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
|
*BLOBS_SIDECAR_MAX,
|
||||||
|
),
|
||||||
|
Protocol::BlobsByRoot => RpcLimits::new(
|
||||||
|
*SIGNED_BLOCK_AND_BLOBS_MIN,
|
||||||
|
*SIGNED_BLOCK_AND_BLOBS_MAX,
|
||||||
|
),
|
||||||
|
|
||||||
Protocol::Ping => RpcLimits::new(
|
Protocol::Ping => RpcLimits::new(
|
||||||
<Ping as Encode>::ssz_fixed_len(),
|
<Ping as Encode>::ssz_fixed_len(),
|
||||||
|
@ -435,7 +435,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
// Pick out the required blocks, ignoring skip-slots.
|
// Pick out the required blocks, ignoring skip-slots.
|
||||||
let mut last_block_root = req
|
let mut last_block_root = req
|
||||||
.start_slot
|
.start_slot
|
||||||
.checked_sub(0)
|
.checked_sub(1)
|
||||||
.map(|prev_slot| {
|
.map(|prev_slot| {
|
||||||
self.chain
|
self.chain
|
||||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||||
@ -614,7 +614,7 @@ impl<T: BeaconChainTypes> Worker<T> {
|
|||||||
// Pick out the required blocks, ignoring skip-slots.
|
// Pick out the required blocks, ignoring skip-slots.
|
||||||
let mut last_block_root = req
|
let mut last_block_root = req
|
||||||
.start_slot
|
.start_slot
|
||||||
.checked_sub(0)
|
.checked_sub(1)
|
||||||
.map(|prev_slot| {
|
.map(|prev_slot| {
|
||||||
self.chain
|
self.chain
|
||||||
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
.block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev)
|
||||||
|
@ -314,14 +314,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
let (chain_id, batch_id, info) = entry.get_mut();
|
||||||
let chain_id = chain_id.clone();
|
let chain_id = chain_id.clone();
|
||||||
let batch_id = batch_id.clone();
|
let batch_id = batch_id.clone();
|
||||||
|
let stream_terminator = maybe_block.is_none();
|
||||||
info.add_block_response(maybe_block);
|
info.add_block_response(maybe_block);
|
||||||
let maybe_block = info.pop_response().map(|block_sidecar_pair| {
|
let maybe_block_wrapped = info.pop_response().map(|block_sidecar_pair| {
|
||||||
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
BlockWrapper::BlockAndBlob { block_sidecar_pair }
|
||||||
});
|
});
|
||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
entry.remove();
|
entry.remove();
|
||||||
}
|
}
|
||||||
Some((chain_id, batch_id, maybe_block))
|
if !stream_terminator && maybe_block_wrapped.is_none() {
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
Some((chain_id, batch_id, maybe_block_wrapped))
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
@ -356,6 +360,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
let (chain_id, batch_id, info) = entry.get_mut();
|
let (chain_id, batch_id, info) = entry.get_mut();
|
||||||
let chain_id = chain_id.clone();
|
let chain_id = chain_id.clone();
|
||||||
let batch_id = batch_id.clone();
|
let batch_id = batch_id.clone();
|
||||||
|
let stream_terminator = maybe_sidecar.is_none();
|
||||||
info.add_sidecar_response(maybe_sidecar);
|
info.add_sidecar_response(maybe_sidecar);
|
||||||
let maybe_block = info
|
let maybe_block = info
|
||||||
.pop_response()
|
.pop_response()
|
||||||
@ -363,6 +368,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
|
|||||||
if info.is_finished() {
|
if info.is_finished() {
|
||||||
entry.remove();
|
entry.remove();
|
||||||
}
|
}
|
||||||
|
if !stream_terminator && maybe_block.is_none() {
|
||||||
|
return None
|
||||||
|
}
|
||||||
Some((chain_id, batch_id, maybe_block))
|
Some((chain_id, batch_id, maybe_block))
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
|
@ -503,9 +503,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
pub fn get_blobs(&self, block_root: &Hash256) -> Result<Option<BlobsSidecar<E>>, Error> {
|
||||||
if let Some(blobs) = self.blob_cache.lock().get(block_root) {
|
if let Some(bytes) = self
|
||||||
Ok(Some(blobs.clone()))
|
|
||||||
} else if let Some(bytes) = self
|
|
||||||
.hot_db
|
.hot_db
|
||||||
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_bytes())?
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user