Faster BeaconState enc/dec (#671)

* Add state enc/dec benches

* Add example for flamegraph

* Use `PublicKeyBytes` for `Validator`

* Ripple PublicKeyBytes change through codebase

* Add benches, optimizations to store BeaconState

* Store BeaconState in StorageContainer too

* Optimize StorageContainer with std::mem magic

* Fix rest_api tests
This commit is contained in:
Paul Hauner 2019-12-06 16:44:03 +11:00 committed by GitHub
parent d0319320ce
commit 75efed305c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 508 additions and 124 deletions

View File

@ -454,7 +454,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns the validator index (if any) for the given public key. /// Returns the validator index (if any) for the given public key.
/// ///
/// Information is retrieved from the present `beacon_state.validators`. /// Information is retrieved from the present `beacon_state.validators`.
pub fn validator_index(&self, pubkey: &PublicKey) -> Option<usize> { pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
for (i, validator) in self.head().beacon_state.validators.iter().enumerate() { for (i, validator) in self.head().beacon_state.validators.iter().enumerate() {
if validator.pubkey == *pubkey { if validator.pubkey == *pubkey {
return Some(i); return Some(i);

View File

@ -1,6 +1,6 @@
use crate::{ApiError, ApiResult}; use crate::{ApiError, ApiResult};
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use bls::PublicKey; use bls::PublicKeyBytes;
use eth2_libp2p::{PubsubMessage, Topic}; use eth2_libp2p::{PubsubMessage, Topic};
use eth2_libp2p::{ use eth2_libp2p::{
BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX,
@ -99,12 +99,12 @@ pub fn parse_root(string: &str) -> Result<Hash256, ApiError> {
} }
/// Parse a PublicKey from a `0x` prefixed hex string /// Parse a PublicKey from a `0x` prefixed hex string
pub fn parse_pubkey(string: &str) -> Result<PublicKey, ApiError> { pub fn parse_pubkey_bytes(string: &str) -> Result<PublicKeyBytes, ApiError> {
const PREFIX: &str = "0x"; const PREFIX: &str = "0x";
if string.starts_with(PREFIX) { if string.starts_with(PREFIX) {
let pubkey_bytes = hex::decode(string.trim_start_matches(PREFIX)) let pubkey_bytes = hex::decode(string.trim_start_matches(PREFIX))
.map_err(|e| ApiError::BadRequest(format!("Invalid hex string: {:?}", e)))?; .map_err(|e| ApiError::BadRequest(format!("Invalid hex string: {:?}", e)))?;
let pubkey = PublicKey::from_bytes(pubkey_bytes.as_slice()).map_err(|e| { let pubkey = PublicKeyBytes::from_bytes(pubkey_bytes.as_slice()).map_err(|e| {
ApiError::BadRequest(format!("Unable to deserialize public key: {:?}.", e)) ApiError::BadRequest(format!("Unable to deserialize public key: {:?}.", e))
})?; })?;
Ok(pubkey) Ok(pubkey)

View File

@ -37,7 +37,7 @@ use tokio::runtime::TaskExecutor;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use url_query::UrlQuery; use url_query::UrlQuery;
pub use crate::helpers::parse_pubkey; pub use crate::helpers::parse_pubkey_bytes;
pub use beacon::{BlockResponse, HeadResponse, StateResponse}; pub use beacon::{BlockResponse, HeadResponse, StateResponse};
pub use config::Config; pub use config::Config;
pub use validator::{BulkValidatorDutiesRequest, ValidatorDuty}; pub use validator::{BulkValidatorDutiesRequest, ValidatorDuty};

View File

@ -1,5 +1,5 @@
use crate::helpers::{ use crate::helpers::{
check_content_type_for_json, parse_pubkey, publish_attestation_to_network, check_content_type_for_json, parse_pubkey_bytes, publish_attestation_to_network,
publish_beacon_block_to_network, publish_beacon_block_to_network,
}; };
use crate::response_builder::ResponseBuilder; use crate::response_builder::ResponseBuilder;
@ -7,7 +7,7 @@ use crate::{ApiError, ApiResult, BoxFut, NetworkChannel, UrlQuery};
use beacon_chain::{ use beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome, AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BlockProcessingOutcome,
}; };
use bls::PublicKey; use bls::PublicKeyBytes;
use futures::future::Future; use futures::future::Future;
use futures::stream::Stream; use futures::stream::Stream;
use hyper::{Body, Request}; use hyper::{Body, Request};
@ -21,7 +21,7 @@ use types::{Attestation, BeaconBlock, CommitteeIndex, Epoch, RelativeEpoch, Slot
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct ValidatorDuty { pub struct ValidatorDuty {
/// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._ /// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._
pub validator_pubkey: PublicKey, pub validator_pubkey: PublicKeyBytes,
/// The slot at which the validator must attest. /// The slot at which the validator must attest.
pub attestation_slot: Option<Slot>, pub attestation_slot: Option<Slot>,
/// The index of the committee within `slot` of which the validator is a member. /// The index of the committee within `slot` of which the validator is a member.
@ -35,7 +35,7 @@ pub struct ValidatorDuty {
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)] #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)]
pub struct BulkValidatorDutiesRequest { pub struct BulkValidatorDutiesRequest {
pub epoch: Epoch, pub epoch: Epoch,
pub pubkeys: Vec<PublicKey>, pub pubkeys: Vec<PublicKeyBytes>,
} }
/// HTTP Handler to retrieve a the duties for a set of validators during a particular epoch. This /// HTTP Handler to retrieve a the duties for a set of validators during a particular epoch. This
@ -60,7 +60,11 @@ pub fn post_validator_duties<T: BeaconChainTypes>(
}) })
}) })
.and_then(|bulk_request| { .and_then(|bulk_request| {
return_validator_duties(beacon_chain, bulk_request.epoch, bulk_request.pubkeys) return_validator_duties(
beacon_chain,
bulk_request.epoch,
bulk_request.pubkeys.into_iter().map(Into::into).collect(),
)
}) })
.and_then(|duties| response_builder?.body_no_ssz(&duties)); .and_then(|duties| response_builder?.body_no_ssz(&duties));
@ -80,7 +84,7 @@ pub fn get_validator_duties<T: BeaconChainTypes>(
let validator_pubkeys = query let validator_pubkeys = query
.all_of("validator_pubkeys")? .all_of("validator_pubkeys")?
.iter() .iter()
.map(|validator_pubkey_str| parse_pubkey(validator_pubkey_str)) .map(|validator_pubkey_str| parse_pubkey_bytes(validator_pubkey_str))
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
let duties = return_validator_duties(beacon_chain, epoch, validator_pubkeys)?; let duties = return_validator_duties(beacon_chain, epoch, validator_pubkeys)?;
@ -91,7 +95,7 @@ pub fn get_validator_duties<T: BeaconChainTypes>(
fn return_validator_duties<T: BeaconChainTypes>( fn return_validator_duties<T: BeaconChainTypes>(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
epoch: Epoch, epoch: Epoch,
validator_pubkeys: Vec<PublicKey>, validator_pubkeys: Vec<PublicKeyBytes>,
) -> Result<Vec<ValidatorDuty>, ApiError> { ) -> Result<Vec<ValidatorDuty>, ApiError> {
let slots_per_epoch = T::EthSpec::slots_per_epoch(); let slots_per_epoch = T::EthSpec::slots_per_epoch();
let head_epoch = beacon_chain.head().beacon_state.current_epoch(); let head_epoch = beacon_chain.head().beacon_state.current_epoch();

View File

@ -6,6 +6,7 @@ use node_test_rig::{
testing_client_config, ClientConfig, ClientGenesis, LocalBeaconNode, testing_client_config, ClientConfig, ClientGenesis, LocalBeaconNode,
}; };
use remote_beacon_node::{PublishStatus, ValidatorDuty}; use remote_beacon_node::{PublishStatus, ValidatorDuty};
use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use tree_hash::TreeHash; use tree_hash::TreeHash;
use types::{ use types::{
@ -182,7 +183,7 @@ fn validator_duties_bulk() {
.beacon_state .beacon_state
.validators .validators
.iter() .iter()
.map(|v| v.pubkey.clone()) .map(|v| (&v.pubkey).try_into().expect("pubkey should be valid"))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let duties = env let duties = env
@ -219,7 +220,7 @@ fn validator_duties() {
.beacon_state .beacon_state
.validators .validators
.iter() .iter()
.map(|v| v.pubkey.clone()) .map(|v| (&v.pubkey).try_into().expect("pubkey should be valid"))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let duties = env let duties = env
@ -270,10 +271,16 @@ fn check_duties<T: BeaconChainTypes>(
.iter() .iter()
.zip(duties.iter()) .zip(duties.iter())
.for_each(|(validator, duty)| { .for_each(|(validator, duty)| {
assert_eq!(*validator, duty.validator_pubkey, "pubkey should match"); assert_eq!(
*validator,
(&duty.validator_pubkey)
.try_into()
.expect("should be valid pubkey"),
"pubkey should match"
);
let validator_index = state let validator_index = state
.get_validator_index(validator) .get_validator_index(&validator.clone().into())
.expect("should have pubkey cache") .expect("should have pubkey cache")
.expect("pubkey should exist"); .expect("pubkey should exist");

View File

@ -4,9 +4,15 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"] authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018" edition = "2018"
[[bench]]
name = "benches"
harness = false
[dev-dependencies] [dev-dependencies]
tempfile = "3.1.0" tempfile = "3.1.0"
sloggers = "0.3.2" sloggers = "0.3.2"
criterion = "0.3.0"
rayon = "1.2.0"
[dependencies] [dependencies]
db-key = "0.0.5" db-key = "0.0.5"

View File

@ -0,0 +1,114 @@
use criterion::Criterion;
use criterion::{black_box, criterion_group, criterion_main, Benchmark};
use rayon::prelude::*;
use ssz::{Decode, Encode};
use std::convert::TryInto;
use store::BeaconStateStorageContainer;
use types::{
test_utils::generate_deterministic_keypair, BeaconState, Epoch, Eth1Data, EthSpec, Hash256,
MainnetEthSpec, Validator,
};
fn get_state<E: EthSpec>(validator_count: usize) -> BeaconState<E> {
let spec = &E::default_spec();
let eth1_data = Eth1Data {
deposit_root: Hash256::zero(),
deposit_count: 0,
block_hash: Hash256::zero(),
};
let mut state = BeaconState::new(0, eth1_data, spec);
for i in 0..validator_count {
state.balances.push(i as u64).expect("should add balance");
}
state.validators = (0..validator_count)
.into_iter()
.collect::<Vec<_>>()
.par_iter()
.map(|&i| Validator {
pubkey: generate_deterministic_keypair(i).pk.into(),
withdrawal_credentials: Hash256::from_low_u64_le(i as u64),
effective_balance: spec.max_effective_balance,
slashed: false,
activation_eligibility_epoch: Epoch::new(0),
activation_epoch: Epoch::new(0),
exit_epoch: Epoch::from(u64::max_value()),
withdrawable_epoch: Epoch::from(u64::max_value()),
})
.collect::<Vec<_>>()
.into();
state.build_all_caches(spec).expect("should build caches");
state
}
fn all_benches(c: &mut Criterion) {
let validator_count = 16_384;
let state = get_state::<MainnetEthSpec>(validator_count);
let storage_container = BeaconStateStorageContainer::new(&state);
let state_bytes = storage_container.as_ssz_bytes();
let inner_state = state.clone();
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("encode/beacon_state", move |b| {
b.iter_batched_ref(
|| inner_state.clone(),
|state| black_box(BeaconStateStorageContainer::new(state).as_ssz_bytes()),
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
let inner_state = state.clone();
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("encode/beacon_state/tree_hash_cache", move |b| {
b.iter_batched_ref(
|| inner_state.tree_hash_cache.clone(),
|tree_hash_cache| black_box(tree_hash_cache.as_ssz_bytes()),
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
let inner_state = state.clone();
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("encode/beacon_state/committee_cache[0]", move |b| {
b.iter_batched_ref(
|| inner_state.committee_caches[0].clone(),
|committee_cache| black_box(committee_cache.as_ssz_bytes()),
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("decode/beacon_state", move |b| {
b.iter_batched_ref(
|| state_bytes.clone(),
|bytes| {
let state: BeaconState<MainnetEthSpec> =
BeaconStateStorageContainer::from_ssz_bytes(&bytes)
.expect("should decode")
.try_into()
.expect("should convert into state");
black_box(state)
},
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
}
criterion_group!(benches, all_benches,);
criterion_main!(benches);

View File

@ -0,0 +1,63 @@
//! These examples only really exist so we can use them for flamegraph. If they get annoying to
//! maintain, feel free to delete.
use rayon::prelude::*;
use ssz::{Decode, Encode};
use std::convert::TryInto;
use store::BeaconStateStorageContainer;
use types::{
test_utils::generate_deterministic_keypair, BeaconState, Epoch, Eth1Data, EthSpec, Hash256,
MainnetEthSpec, Validator,
};
type E = MainnetEthSpec;
fn get_state<E: EthSpec>(validator_count: usize) -> BeaconState<E> {
let spec = &E::default_spec();
let eth1_data = Eth1Data {
deposit_root: Hash256::zero(),
deposit_count: 0,
block_hash: Hash256::zero(),
};
let mut state = BeaconState::new(0, eth1_data, spec);
for i in 0..validator_count {
state.balances.push(i as u64).expect("should add balance");
}
state.validators = (0..validator_count)
.into_iter()
.collect::<Vec<_>>()
.par_iter()
.map(|&i| Validator {
pubkey: generate_deterministic_keypair(i).pk.into(),
withdrawal_credentials: Hash256::from_low_u64_le(i as u64),
effective_balance: spec.max_effective_balance,
slashed: false,
activation_eligibility_epoch: Epoch::new(0),
activation_epoch: Epoch::new(0),
exit_epoch: Epoch::from(u64::max_value()),
withdrawable_epoch: Epoch::from(u64::max_value()),
})
.collect::<Vec<_>>()
.into();
state.build_all_caches(spec).expect("should build caches");
state
}
fn main() {
let validator_count = 1_024;
let state = get_state::<E>(validator_count);
let storage_container = BeaconStateStorageContainer::new(&state);
for _ in 0..1024 {
let container_bytes = storage_container.as_ssz_bytes();
let _: BeaconState<E> = BeaconStateStorageContainer::from_ssz_bytes(&container_bytes)
.expect("should decode")
.try_into()
.expect("should convert into state");
}
}

View File

@ -44,48 +44,51 @@ pub fn get_full_state<S: Store, E: EthSpec>(
/// A container for storing `BeaconState` components. /// A container for storing `BeaconState` components.
// TODO: would be more space efficient with the caches stored separately and referenced by hash // TODO: would be more space efficient with the caches stored separately and referenced by hash
#[derive(Encode, Decode)] #[derive(Encode, Decode)]
struct StorageContainer { pub struct StorageContainer<T: EthSpec> {
state_bytes: Vec<u8>, state: BeaconState<T>,
committee_caches_bytes: Vec<Vec<u8>>, committee_caches: Vec<CommitteeCache>,
tree_hash_cache_bytes: Vec<u8>, tree_hash_cache: BeaconTreeHashCache,
} }
impl StorageContainer { impl<T: EthSpec> StorageContainer<T> {
/// Create a new instance for storing a `BeaconState`. /// Create a new instance for storing a `BeaconState`.
pub fn new<T: EthSpec>(state: &BeaconState<T>) -> Self { pub fn new(state: &BeaconState<T>) -> Self {
let mut committee_caches_bytes = vec![]; let mut state = state.clone();
for cache in state.committee_caches[..].iter() { let mut committee_caches = vec![CommitteeCache::default(); CACHED_EPOCHS];
committee_caches_bytes.push(cache.as_ssz_bytes());
}
let tree_hash_cache_bytes = state.tree_hash_cache.as_ssz_bytes();
Self {
state_bytes: state.as_ssz_bytes(),
committee_caches_bytes,
tree_hash_cache_bytes,
}
}
}
impl<T: EthSpec> TryInto<BeaconState<T>> for StorageContainer {
type Error = Error;
fn try_into(self) -> Result<BeaconState<T>, Error> {
let mut state: BeaconState<T> = BeaconState::from_ssz_bytes(&self.state_bytes)?;
for i in 0..CACHED_EPOCHS { for i in 0..CACHED_EPOCHS {
let bytes = &self.committee_caches_bytes.get(i).ok_or_else(|| { std::mem::swap(&mut state.committee_caches[i], &mut committee_caches[i]);
Error::SszDecodeError(DecodeError::BytesInvalid(
"Insufficient committees for BeaconState".to_string(),
))
})?;
state.committee_caches[i] = CommitteeCache::from_ssz_bytes(bytes)?;
} }
state.tree_hash_cache = BeaconTreeHashCache::from_ssz_bytes(&self.tree_hash_cache_bytes)?; let tree_hash_cache =
std::mem::replace(&mut state.tree_hash_cache, BeaconTreeHashCache::default());
Self {
state,
committee_caches,
tree_hash_cache,
}
}
}
impl<T: EthSpec> TryInto<BeaconState<T>> for StorageContainer<T> {
type Error = Error;
fn try_into(mut self) -> Result<BeaconState<T>, Error> {
let mut state = self.state;
for i in (0..CACHED_EPOCHS).rev() {
if i >= self.committee_caches.len() {
return Err(Error::SszDecodeError(DecodeError::BytesInvalid(
"Insufficient committees for BeaconState".to_string(),
)));
};
state.committee_caches[i] = self.committee_caches.remove(i);
}
state.tree_hash_cache = self.tree_hash_cache;
Ok(state) Ok(state)
} }

View File

@ -33,6 +33,7 @@ pub use self::memory_store::MemoryStore;
pub use self::migrate::Migrate; pub use self::migrate::Migrate;
pub use self::partial_beacon_state::PartialBeaconState; pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error; pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics; pub use metrics::scrape_for_metrics;
pub use types::*; pub use types::*;

View File

@ -440,7 +440,7 @@ pub fn process_deposit<T: EthSpec>(
}; };
// Get an `Option<u64>` where `u64` is the validator index if this deposit public key // Get an `Option<u64>` where `u64` is the validator index if this deposit public key
// already exists in the beacon_state. // already exists in the beacon_state.
let validator_index = get_existing_validator_index(state, &pubkey) let validator_index = get_existing_validator_index(state, &deposit.data.pubkey)
.map_err(|e| e.into_with_index(deposit_index))?; .map_err(|e| e.into_with_index(deposit_index))?;
let amount = deposit.data.amount; let amount = deposit.data.amount;
@ -457,7 +457,7 @@ pub fn process_deposit<T: EthSpec>(
// Create a new validator. // Create a new validator.
let validator = Validator { let validator = Validator {
pubkey, pubkey: pubkey.into(),
withdrawal_credentials: deposit.data.withdrawal_credentials, withdrawal_credentials: deposit.data.withdrawal_credentials,
activation_eligibility_epoch: spec.far_future_epoch, activation_eligibility_epoch: spec.far_future_epoch,
activation_epoch: spec.far_future_epoch, activation_epoch: spec.far_future_epoch,

View File

@ -2,7 +2,8 @@
//! validated individually, or alongside in others in a potentially cheaper bulk operation. //! validated individually, or alongside in others in a potentially cheaper bulk operation.
//! //!
//! This module exposes one function to extract each type of `SignatureSet` from a `BeaconBlock`. //! This module exposes one function to extract each type of `SignatureSet` from a `BeaconBlock`.
use bls::{SignatureSet, SignedMessage}; use bls::{G1Point, G1Ref, SignatureSet, SignedMessage};
use std::borrow::Cow;
use std::convert::TryInto; use std::convert::TryInto;
use tree_hash::{SignedRoot, TreeHash}; use tree_hash::{SignedRoot, TreeHash};
use types::{ use types::{
@ -26,6 +27,9 @@ pub enum Error {
/// The public keys supplied do not match the number of objects requiring keys. Block validity /// The public keys supplied do not match the number of objects requiring keys. Block validity
/// was not determined. /// was not determined.
MismatchedPublicKeyLen { pubkey_len: usize, other_len: usize }, MismatchedPublicKeyLen { pubkey_len: usize, other_len: usize },
/// The public key bytes stored in the `BeaconState` were not valid. This is a serious internal
/// error.
BadBlsBytes { validator_index: u64 },
} }
impl From<BeaconStateError> for Error { impl From<BeaconStateError> for Error {
@ -42,10 +46,6 @@ pub fn block_proposal_signature_set<'a, T: EthSpec>(
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> Result<SignatureSet<'a>> { ) -> Result<SignatureSet<'a>> {
let proposer_index = state.get_beacon_proposer_index(block.slot, spec)?; let proposer_index = state.get_beacon_proposer_index(block.slot, spec)?;
let block_proposer = &state
.validators
.get(proposer_index)
.ok_or_else(|| Error::ValidatorUnknown(proposer_index as u64))?;
let domain = spec.get_domain( let domain = spec.get_domain(
block.slot.epoch(T::slots_per_epoch()), block.slot.epoch(T::slots_per_epoch()),
@ -61,7 +61,7 @@ pub fn block_proposal_signature_set<'a, T: EthSpec>(
Ok(SignatureSet::single( Ok(SignatureSet::single(
&block.signature, &block.signature,
&block_proposer.pubkey, validator_pubkey(state, proposer_index)?,
message, message,
domain, domain,
)) ))
@ -73,7 +73,7 @@ pub fn randao_signature_set<'a, T: EthSpec>(
block: &'a BeaconBlock<T>, block: &'a BeaconBlock<T>,
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> Result<SignatureSet<'a>> { ) -> Result<SignatureSet<'a>> {
let block_proposer = &state.validators[state.get_beacon_proposer_index(block.slot, spec)?]; let proposer_index = state.get_beacon_proposer_index(block.slot, spec)?;
let domain = spec.get_domain( let domain = spec.get_domain(
block.slot.epoch(T::slots_per_epoch()), block.slot.epoch(T::slots_per_epoch()),
@ -85,7 +85,7 @@ pub fn randao_signature_set<'a, T: EthSpec>(
Ok(SignatureSet::single( Ok(SignatureSet::single(
&block.body.randao_reveal, &block.body.randao_reveal,
&block_proposer.pubkey, validator_pubkey(state, proposer_index)?,
message, message,
domain, domain,
)) ))
@ -97,14 +97,21 @@ pub fn proposer_slashing_signature_set<'a, T: EthSpec>(
proposer_slashing: &'a ProposerSlashing, proposer_slashing: &'a ProposerSlashing,
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> Result<(SignatureSet<'a>, SignatureSet<'a>)> { ) -> Result<(SignatureSet<'a>, SignatureSet<'a>)> {
let proposer = state let proposer_index = proposer_slashing.proposer_index as usize;
.validators
.get(proposer_slashing.proposer_index as usize)
.ok_or_else(|| Error::ValidatorUnknown(proposer_slashing.proposer_index))?;
Ok(( Ok((
block_header_signature_set(state, &proposer_slashing.header_1, &proposer.pubkey, spec)?, block_header_signature_set(
block_header_signature_set(state, &proposer_slashing.header_2, &proposer.pubkey, spec)?, state,
&proposer_slashing.header_1,
validator_pubkey(state, proposer_index)?,
spec,
)?,
block_header_signature_set(
state,
&proposer_slashing.header_2,
validator_pubkey(state, proposer_index)?,
spec,
)?,
)) ))
} }
@ -112,7 +119,7 @@ pub fn proposer_slashing_signature_set<'a, T: EthSpec>(
fn block_header_signature_set<'a, T: EthSpec>( fn block_header_signature_set<'a, T: EthSpec>(
state: &'a BeaconState<T>, state: &'a BeaconState<T>,
header: &'a BeaconBlockHeader, header: &'a BeaconBlockHeader,
pubkey: &'a PublicKey, pubkey: Cow<'a, G1Point>,
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> Result<SignatureSet<'a>> { ) -> Result<SignatureSet<'a>> {
let domain = spec.get_domain( let domain = spec.get_domain(
@ -140,10 +147,13 @@ pub fn indexed_attestation_signature_set<'a, 'b, T: EthSpec>(
) -> Result<SignatureSet<'a>> { ) -> Result<SignatureSet<'a>> {
let message = indexed_attestation.data.tree_hash_root(); let message = indexed_attestation.data.tree_hash_root();
let signed_message = SignedMessage::new( let pubkeys = indexed_attestation
get_pubkeys(state, &indexed_attestation.attesting_indices)?, .attesting_indices
message, .into_iter()
); .map(|&validator_idx| Ok(validator_pubkey(state, validator_idx as usize)?))
.collect::<Result<_>>()?;
let signed_message = SignedMessage::new(pubkeys, message);
let domain = spec.get_domain( let domain = spec.get_domain(
indexed_attestation.data.target.epoch, indexed_attestation.data.target.epoch,
@ -200,7 +210,7 @@ pub fn deposit_signature_set<'a>(
// with the fork zeroed. // with the fork zeroed.
SignatureSet::single( SignatureSet::single(
signature, signature,
pubkey, pubkey.g1_ref(),
message.clone(), message.clone(),
spec.get_deposit_domain(), spec.get_deposit_domain(),
) )
@ -213,10 +223,7 @@ pub fn exit_signature_set<'a, T: EthSpec>(
exit: &'a VoluntaryExit, exit: &'a VoluntaryExit,
spec: &'a ChainSpec, spec: &'a ChainSpec,
) -> Result<SignatureSet<'a>> { ) -> Result<SignatureSet<'a>> {
let validator = state let proposer_index = exit.validator_index as usize;
.validators
.get(exit.validator_index as usize)
.ok_or_else(|| Error::ValidatorUnknown(exit.validator_index))?;
let domain = spec.get_domain(exit.epoch, Domain::VoluntaryExit, &state.fork); let domain = spec.get_domain(exit.epoch, Domain::VoluntaryExit, &state.fork);
@ -224,29 +231,27 @@ pub fn exit_signature_set<'a, T: EthSpec>(
Ok(SignatureSet::single( Ok(SignatureSet::single(
&exit.signature, &exit.signature,
&validator.pubkey, validator_pubkey(state, proposer_index)?,
message, message,
domain, domain,
)) ))
} }
/// Maps validator indices to public keys. /// Maps a validator index to a `PublicKey`.
fn get_pubkeys<'a, 'b, T, I>( fn validator_pubkey<'a, T: EthSpec>(
state: &'a BeaconState<T>, state: &'a BeaconState<T>,
validator_indices: I, validator_index: usize,
) -> Result<Vec<&'a PublicKey>> ) -> Result<Cow<'a, G1Point>> {
where let pubkey_bytes = &state
I: IntoIterator<Item = &'b u64>,
T: EthSpec,
{
validator_indices
.into_iter()
.map(|&validator_idx| {
state
.validators .validators
.get(validator_idx as usize) .get(validator_index)
.ok_or_else(|| Error::ValidatorUnknown(validator_idx)) .ok_or_else(|| Error::ValidatorUnknown(validator_index as u64))?
.map(|validator| &validator.pubkey) .pubkey;
pubkey_bytes
.try_into()
.map(|pubkey: PublicKey| Cow::Owned(pubkey.as_raw().point.clone()))
.map_err(|_| Error::BadBlsBytes {
validator_index: validator_index as u64,
}) })
.collect()
} }

View File

@ -35,7 +35,7 @@ pub fn verify_deposit_signature(deposit_data: &DepositData, spec: &ChainSpec) ->
/// Errors if the state's `pubkey_cache` is not current. /// Errors if the state's `pubkey_cache` is not current.
pub fn get_existing_validator_index<T: EthSpec>( pub fn get_existing_validator_index<T: EthSpec>(
state: &BeaconState<T>, state: &BeaconState<T>,
pub_key: &PublicKey, pub_key: &PublicKeyBytes,
) -> Result<Option<u64>> { ) -> Result<Option<u64>> {
let validator_index = state.get_validator_index(pub_key)?; let validator_index = state.get_validator_index(pub_key)?;
Ok(validator_index.map(|idx| idx as u64)) Ok(validator_index.map(|idx| idx as u64))

View File

@ -4,6 +4,10 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>"] authors = ["Paul Hauner <paul@paulhauner.com>", "Age Manning <Age@AgeManning.com>"]
edition = "2018" edition = "2018"
[[bench]]
name = "benches"
harness = false
[dependencies] [dependencies]
bls = { path = "../utils/bls" } bls = { path = "../utils/bls" }
compare_fields = { path = "../utils/compare_fields" } compare_fields = { path = "../utils/compare_fields" }
@ -37,3 +41,4 @@ tempfile = "3.1.0"
[dev-dependencies] [dev-dependencies]
env_logger = "0.7.1" env_logger = "0.7.1"
serde_json = "1.0.41" serde_json = "1.0.41"
criterion = "0.3.0"

View File

@ -0,0 +1,79 @@
use criterion::Criterion;
use criterion::{black_box, criterion_group, criterion_main, Benchmark};
use rayon::prelude::*;
use ssz::{Decode, Encode};
use types::{
test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256,
MainnetEthSpec, Validator,
};
fn get_state<E: EthSpec>(validator_count: usize) -> BeaconState<E> {
let spec = &E::default_spec();
let eth1_data = Eth1Data {
deposit_root: Hash256::zero(),
deposit_count: 0,
block_hash: Hash256::zero(),
};
let mut state = BeaconState::new(0, eth1_data, spec);
for i in 0..validator_count {
state.balances.push(i as u64).expect("should add balance");
}
state.validators = (0..validator_count)
.into_iter()
.collect::<Vec<_>>()
.par_iter()
.map(|&i| Validator {
pubkey: generate_deterministic_keypair(i).pk.into(),
withdrawal_credentials: Hash256::from_low_u64_le(i as u64),
effective_balance: i as u64,
slashed: i % 2 == 0,
activation_eligibility_epoch: i.into(),
activation_epoch: i.into(),
exit_epoch: i.into(),
withdrawable_epoch: i.into(),
})
.collect::<Vec<_>>()
.into();
state
}
fn all_benches(c: &mut Criterion) {
let validator_count = 16_384;
let state = get_state::<MainnetEthSpec>(validator_count);
let state_bytes = state.as_ssz_bytes();
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("encode/beacon_state", move |b| {
b.iter_batched_ref(
|| state.clone(),
|state| black_box(state.as_ssz_bytes()),
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
c.bench(
&format!("{}_validators", validator_count),
Benchmark::new("decode/beacon_state", move |b| {
b.iter_batched_ref(
|| state_bytes.clone(),
|bytes| {
let state: BeaconState<MainnetEthSpec> =
BeaconState::from_ssz_bytes(&bytes).expect("should decode");
black_box(state)
},
criterion::BatchSize::SmallInput,
)
})
.sample_size(10),
);
}
criterion_group!(benches, all_benches,);
criterion_main!(benches);

View File

@ -0,0 +1,50 @@
//! These examples only really exist so we can use them for flamegraph. If they get annoying to
//! maintain, feel free to delete.
use ssz::{Decode, Encode};
use types::{
test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256,
MinimalEthSpec, Validator,
};
type E = MinimalEthSpec;
fn get_state(validator_count: usize) -> BeaconState<E> {
let spec = &E::default_spec();
let eth1_data = Eth1Data {
deposit_root: Hash256::zero(),
deposit_count: 0,
block_hash: Hash256::zero(),
};
let mut state = BeaconState::new(0, eth1_data, spec);
for i in 0..validator_count {
state.balances.push(i as u64).expect("should add balance");
state
.validators
.push(Validator {
pubkey: generate_deterministic_keypair(i).pk.into(),
withdrawal_credentials: Hash256::from_low_u64_le(i as u64),
effective_balance: i as u64,
slashed: i % 2 == 0,
activation_eligibility_epoch: i.into(),
activation_epoch: i.into(),
exit_epoch: i.into(),
withdrawable_epoch: i.into(),
})
.expect("should add validator");
}
state
}
fn main() {
let validator_count = 1_024;
let state = get_state(validator_count);
for _ in 0..1_024 {
let state_bytes = state.as_ssz_bytes();
let _: BeaconState<E> = BeaconState::from_ssz_bytes(&state_bytes).expect("should decode");
}
}

View File

@ -268,7 +268,7 @@ impl<T: EthSpec> BeaconState<T> {
/// returns `None`. /// returns `None`.
/// ///
/// Requires a fully up-to-date `pubkey_cache`, returns an error if this is not the case. /// Requires a fully up-to-date `pubkey_cache`, returns an error if this is not the case.
pub fn get_validator_index(&self, pubkey: &PublicKey) -> Result<Option<usize>, Error> { pub fn get_validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
if self.pubkey_cache.len() == self.validators.len() { if self.pubkey_cache.len() == self.validators.len() {
Ok(self.pubkey_cache.get(pubkey)) Ok(self.pubkey_cache.get(pubkey))
} else { } else {
@ -860,7 +860,7 @@ impl<T: EthSpec> BeaconState<T> {
.enumerate() .enumerate()
.skip(self.pubkey_cache.len()) .skip(self.pubkey_cache.len())
{ {
let success = self.pubkey_cache.insert(validator.pubkey.clone(), i); let success = self.pubkey_cache.insert(validator.pubkey.clone().into(), i);
if !success { if !success {
return Err(Error::PubkeyCacheInconsistent); return Err(Error::PubkeyCacheInconsistent);
} }

View File

@ -10,7 +10,7 @@ pub struct PubkeyCache {
/// len, as it does not increase when duplicate keys are added. Duplicate keys are used during /// len, as it does not increase when duplicate keys are added. Duplicate keys are used during
/// testing. /// testing.
len: usize, len: usize,
map: HashMap<PublicKey, ValidatorIndex>, map: HashMap<PublicKeyBytes, ValidatorIndex>,
} }
impl PubkeyCache { impl PubkeyCache {
@ -23,7 +23,7 @@ impl PubkeyCache {
/// ///
/// The added index must equal the number of validators already added to the map. This ensures /// The added index must equal the number of validators already added to the map. This ensures
/// that an index is never skipped. /// that an index is never skipped.
pub fn insert(&mut self, pubkey: PublicKey, index: ValidatorIndex) -> bool { pub fn insert(&mut self, pubkey: PublicKeyBytes, index: ValidatorIndex) -> bool {
if index == self.len { if index == self.len {
self.map.insert(pubkey, index); self.map.insert(pubkey, index);
self.len += 1; self.len += 1;
@ -34,7 +34,7 @@ impl PubkeyCache {
} }
/// Looks up a validator index's by their public key. /// Looks up a validator index's by their public key.
pub fn get(&self, pubkey: &PublicKey) -> Option<ValidatorIndex> { pub fn get(&self, pubkey: &PublicKeyBytes) -> Option<ValidatorIndex> {
self.map.get(pubkey).copied() self.map.get(pubkey).copied()
} }
} }

View File

@ -109,7 +109,7 @@ impl<T: EthSpec> TestingBeaconStateBuilder<T> {
)); ));
Validator { Validator {
pubkey: keypair.pk.clone(), pubkey: keypair.pk.clone().into(),
withdrawal_credentials, withdrawal_credentials,
// All validators start active. // All validators start active.
activation_eligibility_epoch: T::genesis_epoch(), activation_eligibility_epoch: T::genesis_epoch(),

View File

@ -1,4 +1,4 @@
use crate::{test_utils::TestRandom, Epoch, Hash256, PublicKey}; use crate::{test_utils::TestRandom, Epoch, Hash256, PublicKeyBytes};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
@ -10,7 +10,7 @@ use tree_hash_derive::TreeHash;
/// Spec v0.9.1 /// Spec v0.9.1
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom, TreeHash)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode, TestRandom, TreeHash)]
pub struct Validator { pub struct Validator {
pub pubkey: PublicKey, pub pubkey: PublicKeyBytes,
pub withdrawal_credentials: Hash256, pub withdrawal_credentials: Hash256,
pub effective_balance: u64, pub effective_balance: u64,
pub slashed: bool, pub slashed: bool,
@ -46,7 +46,7 @@ impl Default for Validator {
/// Yields a "default" `Validator`. Primarily used for testing. /// Yields a "default" `Validator`. Primarily used for testing.
fn default() -> Self { fn default() -> Self {
Self { Self {
pubkey: PublicKey::default(), pubkey: PublicKeyBytes::empty(),
withdrawal_credentials: Hash256::default(), withdrawal_credentials: Hash256::default(),
activation_eligibility_epoch: Epoch::from(std::u64::MAX), activation_eligibility_epoch: Epoch::from(std::u64::MAX),
activation_epoch: Epoch::from(std::u64::MAX), activation_epoch: Epoch::from(std::u64::MAX),

View File

@ -14,7 +14,7 @@ pub use crate::public_key_bytes::PublicKeyBytes;
pub use crate::secret_key::SecretKey; pub use crate::secret_key::SecretKey;
pub use crate::signature_bytes::SignatureBytes; pub use crate::signature_bytes::SignatureBytes;
pub use milagro_bls::{compress_g2, hash_on_g2, G1Point}; pub use milagro_bls::{compress_g2, hash_on_g2, G1Point};
pub use signature_set::{verify_signature_sets, SignatureSet, SignedMessage}; pub use signature_set::{verify_signature_sets, G1Ref, SignatureSet, SignedMessage};
#[cfg(feature = "fake_crypto")] #[cfg(feature = "fake_crypto")]
mod fake_aggregate_public_key; mod fake_aggregate_public_key;

View File

@ -128,6 +128,12 @@ macro_rules! bytes_struct {
} }
} }
impl std::hash::Hash for $name {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash(state)
}
}
impl Eq for $name {} impl Eq for $name {}
impl std::convert::TryInto<$type> for &$name { impl std::convert::TryInto<$type> for &$name {

View File

@ -1,5 +1,6 @@
use crate::{AggregatePublicKey, AggregateSignature, PublicKey, Signature}; use crate::{AggregatePublicKey, AggregateSignature, PublicKey, Signature};
use milagro_bls::{G1Point, G2Point}; use milagro_bls::{G1Point, G2Point};
use std::borrow::Cow;
#[cfg(not(feature = "fake_crypto"))] #[cfg(not(feature = "fake_crypto"))]
use milagro_bls::AggregateSignature as RawAggregateSignature; use milagro_bls::AggregateSignature as RawAggregateSignature;
@ -9,17 +10,14 @@ type Domain = u64;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SignedMessage<'a> { pub struct SignedMessage<'a> {
signing_keys: Vec<&'a G1Point>, signing_keys: Vec<Cow<'a, G1Point>>,
message: Message, message: Message,
} }
impl<'a> SignedMessage<'a> { impl<'a> SignedMessage<'a> {
pub fn new<T>(signing_keys: Vec<&'a T>, message: Message) -> Self pub fn new(signing_keys: Vec<Cow<'a, G1Point>>, message: Message) -> Self {
where
T: G1Ref,
{
Self { Self {
signing_keys: signing_keys.iter().map(|k| k.g1_ref()).collect(), signing_keys,
message, message,
} }
} }
@ -33,14 +31,13 @@ pub struct SignatureSet<'a> {
} }
impl<'a> SignatureSet<'a> { impl<'a> SignatureSet<'a> {
pub fn single<S, T>( pub fn single<S>(
signature: &'a S, signature: &'a S,
signing_key: &'a T, signing_key: Cow<'a, G1Point>,
message: Message, message: Message,
domain: Domain, domain: Domain,
) -> Self ) -> Self
where where
T: G1Ref,
S: G2Ref, S: G2Ref,
{ {
Self { Self {
@ -53,13 +50,13 @@ impl<'a> SignatureSet<'a> {
pub fn dual<S, T>( pub fn dual<S, T>(
signature: &'a S, signature: &'a S,
message_0: Message, message_0: Message,
message_0_signing_keys: Vec<&'a T>, message_0_signing_keys: Vec<Cow<'a, G1Point>>,
message_1: Message, message_1: Message,
message_1_signing_keys: Vec<&'a T>, message_1_signing_keys: Vec<Cow<'a, G1Point>>,
domain: Domain, domain: Domain,
) -> Self ) -> Self
where where
T: G1Ref, T: G1Ref + Clone,
S: G2Ref, S: G2Ref,
{ {
Self { Self {
@ -95,7 +92,7 @@ impl<'a> SignatureSet<'a> {
messages.push(signed_message.message.clone()); messages.push(signed_message.message.clone());
let point = if signed_message.signing_keys.len() == 1 { let point = if signed_message.signing_keys.len() == 1 {
signed_message.signing_keys[0].clone() signed_message.signing_keys[0].clone().into_owned()
} else { } else {
aggregate_public_keys(&signed_message.signing_keys) aggregate_public_keys(&signed_message.signing_keys)
}; };
@ -132,7 +129,7 @@ impl<'a> Into<VerifySet<'a>> for SignatureSet<'a> {
.into_iter() .into_iter()
.map(|signed_message| { .map(|signed_message| {
let key = if signed_message.signing_keys.len() == 1 { let key = if signed_message.signing_keys.len() == 1 {
signed_message.signing_keys[0].clone() signed_message.signing_keys[0].clone().into_owned()
} else { } else {
aggregate_public_keys(&signed_message.signing_keys) aggregate_public_keys(&signed_message.signing_keys)
}; };
@ -146,12 +143,12 @@ impl<'a> Into<VerifySet<'a>> for SignatureSet<'a> {
} }
/// Create an aggregate public key for a list of validators, failing if any key can't be found. /// Create an aggregate public key for a list of validators, failing if any key can't be found.
fn aggregate_public_keys<'a>(public_keys: &'a [&'a G1Point]) -> G1Point { fn aggregate_public_keys<'a>(public_keys: &'a [Cow<'a, G1Point>]) -> G1Point {
let mut aggregate = let mut aggregate =
public_keys public_keys
.iter() .iter()
.fold(AggregatePublicKey::new(), |mut aggregate, &pubkey| { .fold(AggregatePublicKey::new(), |mut aggregate, pubkey| {
aggregate.add_point(pubkey); aggregate.add_point(&pubkey);
aggregate aggregate
}); });
@ -161,18 +158,18 @@ fn aggregate_public_keys<'a>(public_keys: &'a [&'a G1Point]) -> G1Point {
} }
pub trait G1Ref { pub trait G1Ref {
fn g1_ref(&self) -> &G1Point; fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point>;
} }
impl G1Ref for AggregatePublicKey { impl G1Ref for AggregatePublicKey {
fn g1_ref(&self) -> &G1Point { fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point> {
&self.as_raw().point Cow::Borrowed(&self.as_raw().point)
} }
} }
impl G1Ref for PublicKey { impl G1Ref for PublicKey {
fn g1_ref(&self) -> &G1Point { fn g1_ref<'a>(&'a self) -> Cow<'a, G1Point> {
&self.as_raw().point Cow::Borrowed(&self.as_raw().point)
} }
} }

View File

@ -260,7 +260,10 @@ impl<E: EthSpec> Validator<E> {
let bulk_request = BulkValidatorDutiesRequest { let bulk_request = BulkValidatorDutiesRequest {
epoch, epoch,
pubkeys: validator_pubkeys.to_vec(), pubkeys: validator_pubkeys
.iter()
.map(|pubkey| pubkey.clone().into())
.collect(),
}; };
self.url("duties") self.url("duties")

View File

@ -1,8 +1,11 @@
use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use crate::{
duties_service::{DutiesService, ValidatorDuty},
validator_store::ValidatorStore,
};
use environment::RuntimeContext; use environment::RuntimeContext;
use exit_future::Signal; use exit_future::Signal;
use futures::{Future, Stream}; use futures::{Future, Stream};
use remote_beacon_node::{PublishStatus, RemoteBeaconNode, ValidatorDuty}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode};
use slog::{crit, info, trace}; use slog::{crit, info, trace};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::HashMap; use std::collections::HashMap;

View File

@ -3,15 +3,16 @@ use environment::RuntimeContext;
use exit_future::Signal; use exit_future::Signal;
use futures::{Future, IntoFuture, Stream}; use futures::{Future, IntoFuture, Stream};
use parking_lot::RwLock; use parking_lot::RwLock;
use remote_beacon_node::{RemoteBeaconNode, ValidatorDuty}; use remote_beacon_node::RemoteBeaconNode;
use slog::{crit, error, info, trace, warn}; use slog::{crit, error, info, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::timer::Interval; use tokio::timer::Interval;
use types::{ChainSpec, Epoch, EthSpec, PublicKey, Slot}; use types::{ChainSpec, CommitteeIndex, Epoch, EthSpec, PublicKey, Slot};
/// Delay this period of time after the slot starts. This allows the node to process the new slot. /// Delay this period of time after the slot starts. This allows the node to process the new slot.
const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100); const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(100);
@ -21,6 +22,37 @@ const PRUNE_DEPTH: u64 = 4;
type BaseHashMap = HashMap<PublicKey, HashMap<Epoch, ValidatorDuty>>; type BaseHashMap = HashMap<PublicKey, HashMap<Epoch, ValidatorDuty>>;
/// Stores the duties for some validator for an epoch.
#[derive(PartialEq, Debug, Clone)]
pub struct ValidatorDuty {
/// The validator's BLS public key, uniquely identifying them. _48-bytes, hex encoded with 0x prefix, case insensitive._
pub validator_pubkey: PublicKey,
/// The slot at which the validator must attest.
pub attestation_slot: Option<Slot>,
/// The index of the committee within `slot` of which the validator is a member.
pub attestation_committee_index: Option<CommitteeIndex>,
/// The position of the validator in the committee.
pub attestation_committee_position: Option<usize>,
/// The slots in which a validator must propose a block (can be empty).
pub block_proposal_slots: Vec<Slot>,
}
impl TryInto<ValidatorDuty> for remote_beacon_node::ValidatorDuty {
type Error = String;
fn try_into(self) -> Result<ValidatorDuty, Self::Error> {
Ok(ValidatorDuty {
validator_pubkey: (&self.validator_pubkey)
.try_into()
.map_err(|e| format!("Invalid pubkey bytes from server: {:?}", e))?,
attestation_slot: self.attestation_slot,
attestation_committee_index: self.attestation_committee_index,
attestation_committee_position: self.attestation_committee_position,
block_proposal_slots: self.block_proposal_slots,
})
}
}
/// The outcome of inserting some `ValidatorDuty` into the `DutiesStore`. /// The outcome of inserting some `ValidatorDuty` into the `DutiesStore`.
enum InsertOutcome { enum InsertOutcome {
/// These are the first duties received for this validator. /// These are the first duties received for this validator.
@ -345,7 +377,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
.get_duties_bulk(epoch, pubkeys.as_slice()) .get_duties_bulk(epoch, pubkeys.as_slice())
.map(move |all_duties| (epoch, all_duties)) .map(move |all_duties| (epoch, all_duties))
.map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e)) .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e))
.map(move |(epoch, all_duties)| { .and_then(move |(epoch, all_duties)| {
let log = service_2.context.log.clone(); let log = service_2.context.log.clone();
let mut new_validator = 0; let mut new_validator = 0;
@ -354,7 +386,9 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
let mut replaced = 0; let mut replaced = 0;
let mut invalid = 0; let mut invalid = 0;
all_duties.into_iter().for_each(|duties| { all_duties.into_iter().try_for_each::<_, Result<_, String>>(|remote_duties| {
let duties: ValidatorDuty = remote_duties.try_into()?;
match service_2 match service_2
.store .store
.insert(epoch, duties.clone(), E::slots_per_epoch()) .insert(epoch, duties.clone(), E::slots_per_epoch())
@ -374,7 +408,9 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
InsertOutcome::Replaced => replaced += 1, InsertOutcome::Replaced => replaced += 1,
InsertOutcome::Invalid => invalid += 1, InsertOutcome::Invalid => invalid += 1,
}; };
});
Ok(())
})?;
if invalid > 0 { if invalid > 0 {
error!( error!(
@ -402,6 +438,8 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
"info" => "Chain re-org likely occurred." "info" => "Chain re-org likely occurred."
) )
} }
Ok(())
}) })
} }
} }