Cache deposit signature verification (#1298)

* Bake in Altona testnet (without genesis state)

* Add sig verification, without optimization

* Start integration with genesis service

* Update config.yml

* Fix eth2_testnet_config test

* Stop using default spec in genesis

* Fix lcli compile error

* Update min genesis time

* Fix typo
This commit is contained in:
Paul Hauner 2020-06-26 11:43:06 +10:00 committed by GitHub
parent e3d9832fee
commit e0e41fc8e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 195 additions and 71 deletions

View File

@ -573,7 +573,8 @@ where
.as_ref()
.ok_or_else(|| "dummy_eth1_backend requires a log".to_string())?;
let backend = CachingEth1Backend::new(Eth1Config::default(), log.clone());
let backend =
CachingEth1Backend::new(Eth1Config::default(), log.clone(), self.spec.clone());
let mut eth1_chain = Eth1Chain::new(backend);
eth1_chain.use_dummy_backend = true;

View File

@ -143,9 +143,10 @@ where
ssz_container: &SszEth1,
config: Eth1Config,
log: &Logger,
spec: ChainSpec,
) -> Result<Self, String> {
let backend =
Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, log.clone())?;
Eth1ChainBackend::from_bytes(&ssz_container.backend_bytes, config, log.clone(), spec)?;
Ok(Self {
use_dummy_backend: ssz_container.use_dummy_backend,
backend,
@ -191,7 +192,12 @@ pub trait Eth1ChainBackend<T: EthSpec>: Sized + Send + Sync {
fn as_bytes(&self) -> Vec<u8>;
/// Create a `Eth1ChainBackend` instance given encoded bytes.
fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result<Self, String>;
fn from_bytes(
bytes: &[u8],
config: Eth1Config,
log: Logger,
spec: ChainSpec,
) -> Result<Self, String>;
}
/// Provides a simple, testing-only backend that generates deterministic, meaningless eth1 data.
@ -234,7 +240,12 @@ impl<T: EthSpec> Eth1ChainBackend<T> for DummyEth1ChainBackend<T> {
}
/// Create dummy eth1 backend.
fn from_bytes(_bytes: &[u8], _config: Eth1Config, _log: Logger) -> Result<Self, String> {
fn from_bytes(
_bytes: &[u8],
_config: Eth1Config,
_log: Logger,
_spec: ChainSpec,
) -> Result<Self, String> {
Ok(Self(PhantomData))
}
}
@ -261,9 +272,9 @@ impl<T: EthSpec> CachingEth1Backend<T> {
/// Instantiates `self` with empty caches.
///
/// Does not connect to the eth1 node or start any tasks to keep the cache updated.
pub fn new(config: Eth1Config, log: Logger) -> Self {
pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Self {
Self {
core: HttpService::new(config, log.clone()),
core: HttpService::new(config, log.clone(), spec),
log,
_phantom: PhantomData,
}
@ -389,8 +400,13 @@ impl<T: EthSpec> Eth1ChainBackend<T> for CachingEth1Backend<T> {
}
/// Recover the cached backend from encoded bytes.
fn from_bytes(bytes: &[u8], config: Eth1Config, log: Logger) -> Result<Self, String> {
let inner = HttpService::from_bytes(bytes, config, log.clone())?;
fn from_bytes(
bytes: &[u8],
config: Eth1Config,
log: Logger,
spec: ChainSpec,
) -> Result<Self, String> {
let inner = HttpService::from_bytes(bytes, config, log.clone(), spec)?;
Ok(Self {
core: inner,
log,
@ -549,7 +565,10 @@ mod test {
mod eth1_chain_json_backend {
use super::*;
use eth1::DepositLog;
use types::test_utils::{generate_deterministic_keypair, TestingDepositBuilder};
use types::{
test_utils::{generate_deterministic_keypair, TestingDepositBuilder},
EthSpec, MainnetEthSpec,
};
fn get_eth1_chain() -> Eth1Chain<CachingEth1Backend<E>, E> {
let eth1_config = Eth1Config {
@ -557,7 +576,11 @@ mod test {
};
let log = null_logger().unwrap();
Eth1Chain::new(CachingEth1Backend::new(eth1_config, log))
Eth1Chain::new(CachingEth1Backend::new(
eth1_config,
log,
MainnetEthSpec::default_spec(),
))
}
fn get_deposit_log(i: u64, spec: &ChainSpec) -> DepositLog {
@ -571,6 +594,7 @@ mod test {
deposit_data,
block_number: i,
index: i,
signature_is_valid: true,
}
}

View File

@ -204,7 +204,11 @@ where
"deposit_contract" => &config.eth1.deposit_contract_address
);
let genesis_service = Eth1GenesisService::new(config.eth1, context.log().clone());
let genesis_service = Eth1GenesisService::new(
config.eth1,
context.log().clone(),
context.eth2_config().spec.clone(),
);
let genesis_state = genesis_service
.wait_for_genesis_state(
@ -625,6 +629,10 @@ where
let beacon_chain_builder = self
.beacon_chain_builder
.ok_or_else(|| "caching_eth1_backend requires a beacon_chain_builder")?;
let spec = self
.chain_spec
.clone()
.ok_or_else(|| "caching_eth1_backend requires a chain spec".to_string())?;
let backend = if let Some(eth1_service_from_genesis) = self.eth1_service {
eth1_service_from_genesis.update_config(config)?;
@ -648,10 +656,17 @@ where
&persisted,
config.clone(),
&context.log().clone(),
spec.clone(),
)
.map(|chain| chain.into_backend())
})
.unwrap_or_else(|| Ok(CachingEth1Backend::new(config, context.log().clone())))?
.unwrap_or_else(|| {
Ok(CachingEth1Backend::new(
config,
context.log().clone(),
spec.clone(),
))
})?
};
self.eth1_service = None;

View File

@ -243,36 +243,38 @@ impl DepositCache {
}
}
/// Gets the deposit count at block height = block_number.
/// Returns the number of deposits with valid signatures that have been observed up to and
/// including the block at `block_number`.
///
/// Fetches the `DepositLog` that was emitted at or just before `block_number`
/// and returns the deposit count as `index + 1`.
///
/// Returns `None` if block number queried is 0 or less than deposit_contract_deployed block.
pub fn get_deposit_count_from_cache(&self, block_number: u64) -> Option<u64> {
// Contract cannot be deployed in 0'th block
if block_number == 0 {
return None;
}
if block_number < self.deposit_contract_deploy_block {
return None;
}
// Return 0 if block_num queried is before first deposit
if let Some(first_deposit) = self.logs.first() {
if first_deposit.block_number > block_number {
return Some(0);
}
}
let index = self
.logs
.binary_search_by(|deposit| deposit.block_number.cmp(&block_number));
match index {
Ok(index) => self.logs.get(index).map(|x| x.index + 1),
Err(next) => Some(
/// Returns `None` if the `block_number` is zero or prior to contract deployment.
pub fn get_valid_signature_count(&self, block_number: u64) -> Option<usize> {
if block_number == 0 || block_number < self.deposit_contract_deploy_block {
None
} else {
Some(
self.logs
.get(next.saturating_sub(1))
.map_or(0, |x| x.index + 1),
),
.iter()
.take_while(|deposit| deposit.block_number <= block_number)
.filter(|deposit| deposit.signature_is_valid)
.count(),
)
}
}
/// Returns the number of deposits that have been observed up to and
/// including the block at `block_number`.
///
/// Returns `None` if the `block_number` is zero or prior to contract deployment.
pub fn get_deposit_count_from_cache(&self, block_number: u64) -> Option<u64> {
if block_number == 0 || block_number < self.deposit_contract_deploy_block {
None
} else {
Some(
self.logs
.iter()
.take_while(|deposit| deposit.block_number <= block_number)
.count() as u64,
)
}
}
@ -291,15 +293,18 @@ pub mod tests {
use super::*;
use crate::deposit_log::tests::EXAMPLE_LOG;
use crate::http::Log;
use types::{EthSpec, MainnetEthSpec};
pub const TREE_DEPTH: usize = 32;
fn example_log() -> DepositLog {
let spec = MainnetEthSpec::default_spec();
let log = Log {
block_number: 42,
data: EXAMPLE_LOG.to_vec(),
};
DepositLog::from_log(&log).expect("should decode log")
DepositLog::from_log(&log, &spec).expect("should decode log")
}
#[test]

View File

@ -1,7 +1,10 @@
use super::http::Log;
use ssz::Decode;
use ssz_derive::{Decode, Encode};
use types::{DepositData, Hash256, PublicKeyBytes, SignatureBytes};
use state_processing::per_block_processing::signature_sets::{
deposit_pubkey_signature_message, deposit_signature_set,
};
use types::{ChainSpec, DepositData, Hash256, PublicKeyBytes, SignatureBytes};
/// The following constants define the layout of bytes in the deposit contract `DepositEvent`. The
/// event bytes are formatted according to the Ethereum ABI.
@ -24,11 +27,13 @@ pub struct DepositLog {
pub block_number: u64,
/// The index included with the deposit log.
pub index: u64,
/// True if the signature is valid.
pub signature_is_valid: bool,
}
impl DepositLog {
/// Attempts to parse a raw `Log` from the deposit contract into a `DepositLog`.
pub fn from_log(log: &Log) -> Result<Self, String> {
pub fn from_log(log: &Log, spec: &ChainSpec) -> Result<Self, String> {
let bytes = &log.data;
let pubkey = bytes
@ -58,10 +63,15 @@ impl DepositLog {
.map_err(|e| format!("Invalid signature ssz: {:?}", e))?,
};
let deposit_signature_message = deposit_pubkey_signature_message(&deposit_data, spec)
.ok_or_else(|| "Unable to prepare deposit signature verification".to_string())?;
let signature_is_valid = deposit_signature_set(&deposit_signature_message).is_valid();
Ok(DepositLog {
deposit_data,
block_number: log.block_number,
index: u64::from_ssz_bytes(index).map_err(|e| format!("Invalid index ssz: {:?}", e))?,
signature_is_valid,
})
}
}
@ -70,6 +80,7 @@ impl DepositLog {
pub mod tests {
use super::*;
use crate::http::Log;
use types::{EthSpec, MainnetEthSpec};
/// The data from a deposit event, using the v0.8.3 version of the deposit contract.
pub const EXAMPLE_LOG: &[u8] = &[
@ -103,6 +114,6 @@ pub mod tests {
block_number: 42,
data: EXAMPLE_LOG.to_vec(),
};
DepositLog::from_log(&log).expect("should decode log");
DepositLog::from_log(&log, &MainnetEthSpec::default_spec()).expect("should decode log");
}
}

View File

@ -6,6 +6,7 @@ use crate::{
use parking_lot::RwLock;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use types::ChainSpec;
#[derive(Default)]
pub struct DepositUpdater {
@ -28,6 +29,7 @@ pub struct Inner {
pub block_cache: RwLock<BlockCache>,
pub deposit_cache: RwLock<DepositUpdater>,
pub config: RwLock<Config>,
pub spec: ChainSpec,
}
impl Inner {
@ -47,10 +49,15 @@ impl Inner {
}
/// Recover `Inner` given byte representation of eth1 deposit and block caches.
pub fn from_bytes(bytes: &[u8], config: Config) -> Result<Self, String> {
pub fn from_bytes(bytes: &[u8], config: Config, spec: ChainSpec) -> Result<Self, String> {
let ssz_cache = SszEth1Cache::from_ssz_bytes(bytes)
.map_err(|e| format!("Ssz decoding error: {:?}", e))?;
Ok(ssz_cache.to_inner(config)?)
Ok(ssz_cache.to_inner(config, spec)?)
}
/// Returns a reference to the specification.
pub fn spec(&self) -> &ChainSpec {
&self.spec
}
}
@ -72,7 +79,7 @@ impl SszEth1Cache {
}
}
pub fn to_inner(&self, config: Config) -> Result<Inner, String> {
pub fn to_inner(&self, config: Config, spec: ChainSpec) -> Result<Inner, String> {
Ok(Inner {
block_cache: RwLock::new(self.block_cache.clone()),
deposit_cache: RwLock::new(DepositUpdater {
@ -80,6 +87,7 @@ impl SszEth1Cache {
last_processed_block: self.last_processed_block,
}),
config: RwLock::new(config),
spec,
})
}
}

View File

@ -14,6 +14,7 @@ use std::ops::{Range, RangeInclusive};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{interval_at, Duration, Instant};
use types::ChainSpec;
const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
@ -130,14 +131,15 @@ pub struct Service {
impl Service {
/// Creates a new service. Does not attempt to connect to the eth1 node.
pub fn new(config: Config, log: Logger) -> Self {
pub fn new(config: Config, log: Logger, spec: ChainSpec) -> Self {
Self {
inner: Arc::new(Inner {
block_cache: <_>::default(),
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
config: RwLock::new(config),
..Inner::default()
spec,
}),
log,
}
@ -149,8 +151,13 @@ impl Service {
}
/// Recover the deposit and block caches from encoded bytes.
pub fn from_bytes(bytes: &[u8], config: Config, log: Logger) -> Result<Self, String> {
let inner = Inner::from_bytes(bytes, config)?;
pub fn from_bytes(
bytes: &[u8],
config: Config,
log: Logger,
spec: ChainSpec,
) -> Result<Self, String> {
let inner = Inner::from_bytes(bytes, config, spec)?;
Ok(Self {
inner: Arc::new(inner),
log,
@ -194,6 +201,14 @@ impl Service {
self.inner.block_cache.read().lowest_block_number()
}
/// Returns the highest block that is present in both the deposit and block caches.
pub fn highest_safe_block(&self) -> Option<u64> {
let block_cache = self.blocks().read().highest_block_number()?;
let deposit_cache = self.deposits().read().last_processed_block?;
Some(std::cmp::min(block_cache, deposit_cache))
}
/// Returns the number of currently cached blocks.
pub fn block_cache_len(&self) -> usize {
self.blocks().read().len()
@ -204,6 +219,25 @@ impl Service {
self.deposits().read().cache.len()
}
/// Returns the number of deposits with valid signatures that have been observed.
pub fn get_valid_signature_count(&self) -> Option<usize> {
self.deposits()
.read()
.cache
.get_valid_signature_count(self.highest_safe_block()?)
}
/// Returns the number of deposits with valid signatures that have been observed up to and
/// including the block at `block_number`.
///
/// Returns `None` if the `block_number` is zero or prior to contract deployment.
pub fn get_valid_signature_count_at_block(&self, block_number: u64) -> Option<usize> {
self.deposits()
.read()
.cache
.get_valid_signature_count(block_number)
}
/// Read the service's configuration.
pub fn config(&self) -> RwLockReadGuard<Config> {
self.inner.config.read()
@ -402,9 +436,11 @@ impl Service {
log_chunk
.into_iter()
.map(|raw_log| {
DepositLog::from_log(&raw_log).map_err(|error| Error::FailedToParseDepositLog {
DepositLog::from_log(&raw_log, service.inner.spec()).map_err(|error| {
Error::FailedToParseDepositLog {
block_range: block_range.clone(),
error,
}
})
})
// Return early if any of the logs cannot be parsed.

View File

@ -99,6 +99,7 @@ async fn get_block_number(web3: &Web3<Http>) -> u64 {
mod eth1_cache {
use super::*;
use types::{EthSpec, MainnetEthSpec};
#[tokio::test]
async fn simple_scenario() {
@ -122,6 +123,7 @@ mod eth1_cache {
..Config::default()
},
log.clone(),
MainnetEthSpec::default_spec(),
);
// Create some blocks and then consume them, performing the test `rounds` times.
@ -194,6 +196,7 @@ mod eth1_cache {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
let blocks = cache_len * 2;
@ -240,6 +243,7 @@ mod eth1_cache {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
for _ in 0..4u8 {
@ -282,6 +286,7 @@ mod eth1_cache {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
for _ in 0..n {
@ -328,6 +333,7 @@ mod deposit_tree {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
for round in 0..3 {
@ -401,6 +407,7 @@ mod deposit_tree {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
@ -425,6 +432,8 @@ mod deposit_tree {
async fn cache_consistency() {
let n = 8;
let spec = &MainnetEthSpec::default_spec();
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
let eth1 = GanacheEth1Instance::new()
@ -462,7 +471,7 @@ mod deposit_tree {
let logs: Vec<_> = blocking_deposit_logs(&eth1, 0..block_number)
.await
.iter()
.map(|raw| DepositLog::from_log(raw).expect("should parse deposit log"))
.map(|raw| DepositLog::from_log(raw, spec).expect("should parse deposit log"))
.inspect(|log| {
tree.insert_log(log.clone())
.expect("should add consecutive logs")
@ -639,6 +648,7 @@ mod fast {
..Config::default()
},
log,
MainnetEthSpec::default_spec(),
);
let n = 10;
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
@ -708,7 +718,7 @@ mod persist {
block_cache_truncation: None,
..Config::default()
};
let service = Service::new(config.clone(), log.clone());
let service = Service::new(config.clone(), log.clone(), MainnetEthSpec::default_spec());
let n = 10;
let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect();
for deposit in &deposits {
@ -745,7 +755,8 @@ mod persist {
// Drop service and recover from bytes
drop(service);
let recovered_service = Service::from_bytes(&eth1_bytes, config, log).unwrap();
let recovered_service =
Service::from_bytes(&eth1_bytes, config, log, MainnetEthSpec::default_spec()).unwrap();
assert_eq!(
recovered_service.block_cache_len(),
block_count,

View File

@ -43,7 +43,7 @@ impl Eth1GenesisService {
/// Creates a new service. Does not attempt to connect to the Eth1 node.
///
/// Modifies the given `config` to make it more suitable to the task of listening to genesis.
pub fn new(config: Eth1Config, log: Logger) -> Self {
pub fn new(config: Eth1Config, log: Logger, spec: ChainSpec) -> Self {
let config = Eth1Config {
// Truncating the block cache makes searching for genesis more
// complicated.
@ -65,7 +65,7 @@ impl Eth1GenesisService {
};
Self {
eth1_service: Eth1Service::new(config, log),
eth1_service: Eth1Service::new(config, log, spec),
stats: Arc::new(Statistics {
highest_processed_block: AtomicU64::new(0),
active_validator_count: AtomicUsize::new(0),
@ -161,7 +161,7 @@ impl Eth1GenesisService {
log,
"Imported eth1 blocks";
"latest_block_timestamp" => eth1_service.latest_block_timestamp(),
"cache_head" => self.highest_safe_block(),
"cache_head" => eth1_service.highest_safe_block(),
"count" => outcome.blocks_imported,
);
outcome.blocks_imported
@ -205,8 +205,9 @@ impl Eth1GenesisService {
log,
"Waiting for more validators";
"min_genesis_active_validators" => spec.min_genesis_active_validator_count,
"total_deposits" => total_deposit_count,
"active_validators" => active_validator_count,
"total_deposits" => total_deposit_count,
"valid_deposits" => eth1_service.get_valid_signature_count().unwrap_or(0),
);
}
} else {
@ -255,7 +256,10 @@ impl Eth1GenesisService {
//
// Don't update the highest processed block since we want to come back and process this
// again later.
if self.highest_safe_block().map_or(true, |n| block.number > n) {
if eth1_service
.highest_safe_block()
.map_or(true, |n| block.number > n)
{
continue;
}
@ -287,6 +291,21 @@ impl Eth1GenesisService {
continue;
}
let valid_signature_count = eth1_service
.get_valid_signature_count_at_block(block.number)
.unwrap_or(0);
if (valid_signature_count as u64) < spec.min_genesis_active_validator_count {
trace!(
log,
"Insufficient valid signatures";
"genesis_delay" => spec.genesis_delay,
"valid_signature_count" => valid_signature_count,
"min_validator_count" => spec.min_genesis_active_validator_count,
"eth1_block_number" => block.number,
);
continue;
}
// Generate a potential beacon state for this eth1 block.
//
// Note: this state is fully valid, some fields have been bypassed to make verification
@ -416,14 +435,6 @@ impl Eth1GenesisService {
Ok(state)
}
/// Returns the highest block that is present in both the deposit and block caches.
fn highest_safe_block(&self) -> Option<u64> {
let block_cache = self.eth1_service.blocks().read().highest_block_number()?;
let deposit_cache = self.eth1_service.deposits().read().last_processed_block?;
Some(std::cmp::min(block_cache, deposit_cache))
}
/// Returns all deposit logs included in `block_number` and all prior blocks.
fn deposit_logs_at_block(&self, block_number: u64) -> Vec<DepositLog> {
self.eth1_service

View File

@ -53,6 +53,7 @@ fn basic() {
..Eth1Config::default()
},
log,
spec.clone(),
);
// NOTE: this test is sensitive to the response speed of the external web3 server. If

View File

@ -46,7 +46,8 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches<'_>) -> Res
config.lowest_cached_block_number = eth2_testnet_config.deposit_contract_deploy_block;
config.follow_distance = spec.eth1_follow_distance / 2;
let genesis_service = Eth1GenesisService::new(config, env.core_context().log().clone());
let genesis_service =
Eth1GenesisService::new(config, env.core_context().log().clone(), spec.clone());
env.runtime().block_on(async {
let _ = genesis_service