Download checkpoint blobs during checkpoint sync (#5252)
* MVP implementation (untested) * update store checkpoint sync test * update cli help * Merge pull request #5253 from realbigsean/checkpoint-blobs-sean Checkpoint blobs sean * Warn only if blobs are missing from server * Merge remote-tracking branch 'origin/unstable' into checkpoint-blobs * Verify checkpoint blobs * Move blob verification earlier
This commit is contained in:
parent
e22c9eed8f
commit
c9702cb0a1
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1495,6 +1495,7 @@ dependencies = [
|
|||||||
"eth1",
|
"eth1",
|
||||||
"eth2",
|
"eth2",
|
||||||
"eth2_config",
|
"eth2_config",
|
||||||
|
"ethereum_ssz",
|
||||||
"execution_layer",
|
"execution_layer",
|
||||||
"futures",
|
"futures",
|
||||||
"genesis",
|
"genesis",
|
||||||
|
@ -39,8 +39,8 @@ use std::time::Duration;
|
|||||||
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
||||||
use task_executor::{ShutdownReason, TaskExecutor};
|
use task_executor::{ShutdownReason, TaskExecutor};
|
||||||
use types::{
|
use types::{
|
||||||
BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, Signature,
|
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti,
|
||||||
SignedBeaconBlock, Slot,
|
Hash256, Signature, SignedBeaconBlock, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
|
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
|
||||||
@ -432,6 +432,7 @@ where
|
|||||||
mut self,
|
mut self,
|
||||||
mut weak_subj_state: BeaconState<TEthSpec>,
|
mut weak_subj_state: BeaconState<TEthSpec>,
|
||||||
weak_subj_block: SignedBeaconBlock<TEthSpec>,
|
weak_subj_block: SignedBeaconBlock<TEthSpec>,
|
||||||
|
weak_subj_blobs: Option<BlobSidecarList<TEthSpec>>,
|
||||||
genesis_state: BeaconState<TEthSpec>,
|
genesis_state: BeaconState<TEthSpec>,
|
||||||
) -> Result<Self, String> {
|
) -> Result<Self, String> {
|
||||||
let store = self
|
let store = self
|
||||||
@ -490,6 +491,29 @@ where
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that blobs (if provided) match the block.
|
||||||
|
if let Some(blobs) = &weak_subj_blobs {
|
||||||
|
let commitments = weak_subj_block
|
||||||
|
.message()
|
||||||
|
.body()
|
||||||
|
.blob_kzg_commitments()
|
||||||
|
.map_err(|e| format!("Blobs provided but block does not reference them: {e:?}"))?;
|
||||||
|
if blobs.len() != commitments.len() {
|
||||||
|
return Err(format!(
|
||||||
|
"Wrong number of blobs, expected: {}, got: {}",
|
||||||
|
commitments.len(),
|
||||||
|
blobs.len()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if commitments
|
||||||
|
.iter()
|
||||||
|
.zip(blobs.iter())
|
||||||
|
.any(|(commitment, blob)| *commitment != blob.kzg_commitment)
|
||||||
|
{
|
||||||
|
return Err("Checkpoint blob does not match block commitment".into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Set the store's split point *before* storing genesis so that genesis is stored
|
// Set the store's split point *before* storing genesis so that genesis is stored
|
||||||
// immediately in the freezer DB.
|
// immediately in the freezer DB.
|
||||||
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
|
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
|
||||||
@ -511,14 +535,19 @@ where
|
|||||||
.do_atomically(block_root_batch)
|
.do_atomically(block_root_batch)
|
||||||
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
|
.map_err(|e| format!("Error writing frozen block roots: {e:?}"))?;
|
||||||
|
|
||||||
// Write the state and block non-atomically, it doesn't matter if they're forgotten
|
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
|
||||||
// about on a crash restart.
|
// about on a crash restart.
|
||||||
store
|
store
|
||||||
.put_state(&weak_subj_state_root, &weak_subj_state)
|
.put_state(&weak_subj_state_root, &weak_subj_state)
|
||||||
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
|
.map_err(|e| format!("Failed to store weak subjectivity state: {e:?}"))?;
|
||||||
store
|
store
|
||||||
.put_block(&weak_subj_block_root, weak_subj_block.clone())
|
.put_block(&weak_subj_block_root, weak_subj_block.clone())
|
||||||
.map_err(|e| format!("Failed to store weak subjectivity block: {:?}", e))?;
|
.map_err(|e| format!("Failed to store weak subjectivity block: {e:?}"))?;
|
||||||
|
if let Some(blobs) = weak_subj_blobs {
|
||||||
|
store
|
||||||
|
.put_blobs(&weak_subj_block_root, blobs)
|
||||||
|
.map_err(|e| format!("Failed to store weak subjectivity blobs: {e:?}"))?;
|
||||||
|
}
|
||||||
|
|
||||||
// Stage the database's metadata fields for atomic storage when `build` is called.
|
// Stage the database's metadata fields for atomic storage when `build` is called.
|
||||||
// This prevents the database from restarting in an inconsistent state if the anchor
|
// This prevents the database from restarting in an inconsistent state if the anchor
|
||||||
|
@ -2396,6 +2396,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
.get_full_block(&wss_block_root)
|
.get_full_block(&wss_block_root)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
|
||||||
let wss_state = full_store
|
let wss_state = full_store
|
||||||
.get_state(&wss_state_root, Some(checkpoint_slot))
|
.get_state(&wss_state_root, Some(checkpoint_slot))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -2438,7 +2439,12 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
.custom_spec(test_spec::<E>())
|
.custom_spec(test_spec::<E>())
|
||||||
.task_executor(harness.chain.task_executor.clone())
|
.task_executor(harness.chain.task_executor.clone())
|
||||||
.logger(log.clone())
|
.logger(log.clone())
|
||||||
.weak_subjectivity_state(wss_state, wss_block.clone(), genesis_state)
|
.weak_subjectivity_state(
|
||||||
|
wss_state,
|
||||||
|
wss_block.clone(),
|
||||||
|
wss_blobs_opt.clone(),
|
||||||
|
genesis_state,
|
||||||
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.store_migrator_config(MigratorConfig::default().blocking())
|
.store_migrator_config(MigratorConfig::default().blocking())
|
||||||
.dummy_eth1_backend()
|
.dummy_eth1_backend()
|
||||||
@ -2456,6 +2462,17 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
.expect("should build");
|
.expect("should build");
|
||||||
|
|
||||||
let beacon_chain = Arc::new(beacon_chain);
|
let beacon_chain = Arc::new(beacon_chain);
|
||||||
|
let wss_block_root = wss_block.canonical_root();
|
||||||
|
let store_wss_block = harness
|
||||||
|
.chain
|
||||||
|
.get_block(&wss_block_root)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
let store_wss_blobs_opt = beacon_chain.store.get_blobs(&wss_block_root).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(store_wss_block, wss_block);
|
||||||
|
assert_eq!(store_wss_blobs_opt, wss_blobs_opt);
|
||||||
|
|
||||||
// Apply blocks forward to reach head.
|
// Apply blocks forward to reach head.
|
||||||
let chain_dump = harness.chain.chain_dump().unwrap();
|
let chain_dump = harness.chain.chain_dump().unwrap();
|
||||||
|
@ -45,3 +45,4 @@ monitoring_api = { workspace = true }
|
|||||||
execution_layer = { workspace = true }
|
execution_layer = { workspace = true }
|
||||||
beacon_processor = { workspace = true }
|
beacon_processor = { workspace = true }
|
||||||
num_cpus = { workspace = true }
|
num_cpus = { workspace = true }
|
||||||
|
ethereum_ssz = { workspace = true }
|
||||||
|
@ -36,6 +36,7 @@ use network::{NetworkConfig, NetworkSenders, NetworkService};
|
|||||||
use slasher::Slasher;
|
use slasher::Slasher;
|
||||||
use slasher_service::SlasherService;
|
use slasher_service::SlasherService;
|
||||||
use slog::{debug, info, warn, Logger};
|
use slog::{debug, info, warn, Logger};
|
||||||
|
use ssz::Decode;
|
||||||
use std::net::TcpListener;
|
use std::net::TcpListener;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@ -44,7 +45,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
|
|||||||
use timer::spawn_timer;
|
use timer::spawn_timer;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use types::{
|
use types::{
|
||||||
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
|
test_utils::generate_deterministic_keypairs, BeaconState, BlobSidecarList, ChainSpec, EthSpec,
|
||||||
ExecutionBlockHash, Hash256, SignedBeaconBlock,
|
ExecutionBlockHash, Hash256, SignedBeaconBlock,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -319,6 +320,7 @@ where
|
|||||||
ClientGenesis::WeakSubjSszBytes {
|
ClientGenesis::WeakSubjSszBytes {
|
||||||
anchor_state_bytes,
|
anchor_state_bytes,
|
||||||
anchor_block_bytes,
|
anchor_block_bytes,
|
||||||
|
anchor_blobs_bytes,
|
||||||
} => {
|
} => {
|
||||||
info!(context.log(), "Starting checkpoint sync");
|
info!(context.log(), "Starting checkpoint sync");
|
||||||
if config.chain.genesis_backfill {
|
if config.chain.genesis_backfill {
|
||||||
@ -332,10 +334,25 @@ where
|
|||||||
.map_err(|e| format!("Unable to parse weak subj state SSZ: {:?}", e))?;
|
.map_err(|e| format!("Unable to parse weak subj state SSZ: {:?}", e))?;
|
||||||
let anchor_block = SignedBeaconBlock::from_ssz_bytes(&anchor_block_bytes, &spec)
|
let anchor_block = SignedBeaconBlock::from_ssz_bytes(&anchor_block_bytes, &spec)
|
||||||
.map_err(|e| format!("Unable to parse weak subj block SSZ: {:?}", e))?;
|
.map_err(|e| format!("Unable to parse weak subj block SSZ: {:?}", e))?;
|
||||||
|
let anchor_blobs = if anchor_block.message().body().has_blobs() {
|
||||||
|
let anchor_blobs_bytes = anchor_blobs_bytes
|
||||||
|
.ok_or("Blobs for checkpoint must be provided using --checkpoint-blobs")?;
|
||||||
|
Some(
|
||||||
|
BlobSidecarList::from_ssz_bytes(&anchor_blobs_bytes)
|
||||||
|
.map_err(|e| format!("Unable to parse weak subj blobs SSZ: {e:?}"))?,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let genesis_state = genesis_state(&runtime_context, &config, log).await?;
|
let genesis_state = genesis_state(&runtime_context, &config, log).await?;
|
||||||
|
|
||||||
builder
|
builder
|
||||||
.weak_subjectivity_state(anchor_state, anchor_block, genesis_state)
|
.weak_subjectivity_state(
|
||||||
|
anchor_state,
|
||||||
|
anchor_block,
|
||||||
|
anchor_blobs,
|
||||||
|
genesis_state,
|
||||||
|
)
|
||||||
.map(|v| (v, None))?
|
.map(|v| (v, None))?
|
||||||
}
|
}
|
||||||
ClientGenesis::CheckpointSyncUrl { url } => {
|
ClientGenesis::CheckpointSyncUrl { url } => {
|
||||||
@ -430,9 +447,33 @@ where
|
|||||||
e => format!("Error fetching finalized block from remote: {:?}", e),
|
e => format!("Error fetching finalized block from remote: {:?}", e),
|
||||||
})?
|
})?
|
||||||
.ok_or("Finalized block missing from remote, it returned 404")?;
|
.ok_or("Finalized block missing from remote, it returned 404")?;
|
||||||
|
let block_root = block.canonical_root();
|
||||||
|
|
||||||
debug!(context.log(), "Downloaded finalized block");
|
debug!(context.log(), "Downloaded finalized block");
|
||||||
|
|
||||||
|
let blobs = if block.message().body().has_blobs() {
|
||||||
|
debug!(context.log(), "Downloading finalized blobs");
|
||||||
|
if let Some(response) = remote
|
||||||
|
.get_blobs::<TEthSpec>(BlockId::Root(block_root), None)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Error fetching finalized blobs from remote: {e:?}"))?
|
||||||
|
{
|
||||||
|
debug!(context.log(), "Downloaded finalized blobs");
|
||||||
|
Some(response.data)
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
context.log(),
|
||||||
|
"Checkpoint server is missing blobs";
|
||||||
|
"block_root" => %block_root,
|
||||||
|
"hint" => "use a different URL or ask the provider to update",
|
||||||
|
"impact" => "db will be slightly corrupt until these blobs are pruned",
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let genesis_state = genesis_state(&runtime_context, &config, log).await?;
|
let genesis_state = genesis_state(&runtime_context, &config, log).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@ -440,7 +481,7 @@ where
|
|||||||
"Loaded checkpoint block and state";
|
"Loaded checkpoint block and state";
|
||||||
"block_slot" => block.slot(),
|
"block_slot" => block.slot(),
|
||||||
"state_slot" => state.slot(),
|
"state_slot" => state.slot(),
|
||||||
"block_root" => ?block.canonical_root(),
|
"block_root" => ?block_root,
|
||||||
);
|
);
|
||||||
|
|
||||||
let service =
|
let service =
|
||||||
@ -468,7 +509,7 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
builder
|
builder
|
||||||
.weak_subjectivity_state(state, block, genesis_state)
|
.weak_subjectivity_state(state, block, blobs, genesis_state)
|
||||||
.map(|v| (v, service))?
|
.map(|v| (v, service))?
|
||||||
}
|
}
|
||||||
ClientGenesis::DepositContract => {
|
ClientGenesis::DepositContract => {
|
||||||
|
@ -35,6 +35,7 @@ pub enum ClientGenesis {
|
|||||||
WeakSubjSszBytes {
|
WeakSubjSszBytes {
|
||||||
anchor_state_bytes: Vec<u8>,
|
anchor_state_bytes: Vec<u8>,
|
||||||
anchor_block_bytes: Vec<u8>,
|
anchor_block_bytes: Vec<u8>,
|
||||||
|
anchor_blobs_bytes: Option<Vec<u8>>,
|
||||||
},
|
},
|
||||||
CheckpointSyncUrl {
|
CheckpointSyncUrl {
|
||||||
url: SensitiveUrl,
|
url: SensitiveUrl,
|
||||||
|
@ -939,6 +939,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.requires("checkpoint-state")
|
.requires("checkpoint-state")
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("checkpoint-blobs")
|
||||||
|
.long("checkpoint-blobs")
|
||||||
|
.help("Set the checkpoint blobs to start syncing from. Must be aligned and match \
|
||||||
|
--checkpoint-block. Using --checkpoint-sync-url instead is recommended.")
|
||||||
|
.value_name("BLOBS_SSZ")
|
||||||
|
.takes_value(true)
|
||||||
|
.requires("checkpoint-block")
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("checkpoint-sync-url")
|
Arg::with_name("checkpoint-sync-url")
|
||||||
.long("checkpoint-sync-url")
|
.long("checkpoint-sync-url")
|
||||||
|
@ -512,9 +512,10 @@ pub fn get_config<E: EthSpec>(
|
|||||||
|
|
||||||
client_config.genesis = if eth2_network_config.genesis_state_is_known() {
|
client_config.genesis = if eth2_network_config.genesis_state_is_known() {
|
||||||
// Set up weak subjectivity sync, or start from the hardcoded genesis state.
|
// Set up weak subjectivity sync, or start from the hardcoded genesis state.
|
||||||
if let (Some(initial_state_path), Some(initial_block_path)) = (
|
if let (Some(initial_state_path), Some(initial_block_path), opt_initial_blobs_path) = (
|
||||||
cli_args.value_of("checkpoint-state"),
|
cli_args.value_of("checkpoint-state"),
|
||||||
cli_args.value_of("checkpoint-block"),
|
cli_args.value_of("checkpoint-block"),
|
||||||
|
cli_args.value_of("checkpoint-blobs"),
|
||||||
) {
|
) {
|
||||||
let read = |path: &str| {
|
let read = |path: &str| {
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -530,10 +531,12 @@ pub fn get_config<E: EthSpec>(
|
|||||||
|
|
||||||
let anchor_state_bytes = read(initial_state_path)?;
|
let anchor_state_bytes = read(initial_state_path)?;
|
||||||
let anchor_block_bytes = read(initial_block_path)?;
|
let anchor_block_bytes = read(initial_block_path)?;
|
||||||
|
let anchor_blobs_bytes = opt_initial_blobs_path.map(read).transpose()?;
|
||||||
|
|
||||||
ClientGenesis::WeakSubjSszBytes {
|
ClientGenesis::WeakSubjSszBytes {
|
||||||
anchor_state_bytes,
|
anchor_state_bytes,
|
||||||
anchor_block_bytes,
|
anchor_block_bytes,
|
||||||
|
anchor_blobs_bytes,
|
||||||
}
|
}
|
||||||
} else if let Some(remote_bn_url) = cli_args.value_of("checkpoint-sync-url") {
|
} else if let Some(remote_bn_url) = cli_args.value_of("checkpoint-sync-url") {
|
||||||
let url = SensitiveUrl::parse(remote_bn_url)
|
let url = SensitiveUrl::parse(remote_bn_url)
|
||||||
|
@ -179,6 +179,9 @@ OPTIONS:
|
|||||||
--builder-user-agent <STRING>
|
--builder-user-agent <STRING>
|
||||||
The HTTP user agent to send alongside requests to the builder URL. The default is Lighthouse's version
|
The HTTP user agent to send alongside requests to the builder URL. The default is Lighthouse's version
|
||||||
string.
|
string.
|
||||||
|
--checkpoint-blobs <BLOBS_SSZ>
|
||||||
|
Set the checkpoint blobs to start syncing from. Must be aligned and match --checkpoint-block. Using
|
||||||
|
--checkpoint-sync-url instead is recommended.
|
||||||
--checkpoint-block <BLOCK_SSZ>
|
--checkpoint-block <BLOCK_SSZ>
|
||||||
Set a checkpoint block to start syncing from. Must be aligned and match --checkpoint-state. Using
|
Set a checkpoint block to start syncing from. Must be aligned and match --checkpoint-state. Using
|
||||||
--checkpoint-sync-url instead is recommended.
|
--checkpoint-sync-url instead is recommended.
|
||||||
|
@ -176,6 +176,12 @@ impl<'a, T: EthSpec, Payload: AbstractExecPayload<T>> BeaconBlockBodyRef<'a, T,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return `true` if this block body has a non-zero number of blobs.
|
||||||
|
pub fn has_blobs(self) -> bool {
|
||||||
|
self.blob_kzg_commitments()
|
||||||
|
.map_or(false, |blobs| !blobs.is_empty())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: EthSpec, Payload: AbstractExecPayload<T>> BeaconBlockBodyRef<'a, T, Payload> {
|
impl<'a, T: EthSpec, Payload: AbstractExecPayload<T>> BeaconBlockBodyRef<'a, T, Payload> {
|
||||||
|
Loading…
Reference in New Issue
Block a user