Doppelganger detection (#2230)

## Issue Addressed

Resolves #2069 

## Proposed Changes

- Adds a `--doppelganger-detection` flag
- Adds a `lighthouse/seen_validators` endpoint, which will make it so the lighthouse VC is not interopable with other client beacon nodes if the `--doppelganger-detection` flag is used, but hopefully this will become standardized. Relevant Eth2 API repo issue: https://github.com/ethereum/eth2.0-APIs/issues/64
- If the `--doppelganger-detection` flag is used, the VC will wait until the beacon node is synced, and then wait an additional 2 epochs. The reason for this is to make sure the beacon node is able to subscribe to the subnets our validators should be attesting on. I think an alternative would be to have the beacon node subscribe to all subnets for 2+ epochs on startup by default.

## Additional Info

I'd like to add tests and would appreciate feedback. 

TODO:  handle validators started via the API, potentially make this default behavior

Co-authored-by: realbigsean <seananderson33@gmail.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
realbigsean 2021-07-31 03:50:52 +00:00
parent 834ee98bc2
commit c5786a8821
38 changed files with 2302 additions and 201 deletions

View File

@ -146,6 +146,28 @@ jobs:
run: sudo npm install -g ganache-cli run: sudo npm install -g ganache-cli
- name: Run the syncing simulator - name: Run the syncing simulator
run: cargo run --release --bin simulator syncing-sim run: cargo run --release --bin simulator syncing-sim
doppelganger-protection-test:
name: doppelganger-protection-test
runs-on: ubuntu-latest
needs: cargo-fmt
steps:
- uses: actions/checkout@v1
- name: Get latest version of stable Rust
run: rustup update stable
- name: Install ganache-cli
run: sudo npm install -g ganache-cli
- name: Install lighthouse and lcli
run: |
make
make install-lcli
- name: Run the doppelganger protection success test script
run: |
cd scripts/tests
./doppelganger_protection.sh success
- name: Run the doppelganger protection failure test script
run: |
cd scripts/tests
./doppelganger_protection.sh failure
check-benchmarks: check-benchmarks:
name: check-benchmarks name: check-benchmarks
runs-on: ubuntu-latest runs-on: ubuntu-latest

1
Cargo.lock generated
View File

@ -7336,6 +7336,7 @@ dependencies = [
"slog-async", "slog-async",
"slog-term", "slog-term",
"slot_clock", "slot_clock",
"task_executor",
"tempfile", "tempfile",
"tokio 1.8.1", "tokio 1.8.1",
"tree_hash", "tree_hash",

View File

@ -3443,6 +3443,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut file = std::fs::File::create(file_name).unwrap(); let mut file = std::fs::File::create(file_name).unwrap();
self.dump_as_dot(&mut file); self.dump_as_dot(&mut file);
} }
/// Checks if attestations have been seen from the given `validator_index` at the
/// given `epoch`.
pub fn validator_seen_at_epoch(&self, validator_index: usize, epoch: Epoch) -> bool {
// It's necessary to assign these checks to intermediate variables to avoid a deadlock.
//
// See: https://github.com/sigp/lighthouse/pull/2230#discussion_r620013993
let attested = self
.observed_attesters
.read()
.index_seen_at_epoch(validator_index, epoch);
let aggregated = self
.observed_aggregators
.read()
.index_seen_at_epoch(validator_index, epoch);
let produced_block = self
.observed_block_producers
.read()
.index_seen_at_epoch(validator_index as u64, epoch);
attested || aggregated || produced_block
}
} }
impl<T: BeaconChainTypes> Drop for BeaconChain<T> { impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

View File

@ -381,6 +381,16 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
pub(crate) fn get_lowest_permissible(&self) -> Epoch { pub(crate) fn get_lowest_permissible(&self) -> Epoch {
self.lowest_permissible_epoch self.lowest_permissible_epoch
} }
/// Returns `true` if the given `index` has been stored in `self` at `epoch`.
///
/// This is useful for doppelganger detection.
pub fn index_seen_at_epoch(&self, index: usize, epoch: Epoch) -> bool {
self.items
.get(&epoch)
.map(|item| item.contains(index))
.unwrap_or(false)
}
} }
/// A container that stores some number of `V` items. /// A container that stores some number of `V` items.

View File

@ -3,7 +3,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use types::{BeaconBlockRef, EthSpec, Slot, Unsigned}; use types::{BeaconBlockRef, Epoch, EthSpec, Slot, Unsigned};
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error { pub enum Error {
@ -114,6 +114,15 @@ impl<E: EthSpec> ObservedBlockProducers<E> {
self.finalized_slot = finalized_slot; self.finalized_slot = finalized_slot;
self.items.retain(|slot, _set| *slot > finalized_slot); self.items.retain(|slot, _set| *slot > finalized_slot);
} }
/// Returns `true` if the given `validator_index` has been stored in `self` at `epoch`.
///
/// This is useful for doppelganger detection.
pub fn index_seen_at_epoch(&self, validator_index: u64, epoch: Epoch) -> bool {
self.items.iter().any(|(slot, producers)| {
slot.epoch(E::slots_per_epoch()) == epoch && producers.contains(&validator_index)
})
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1907,6 +1907,49 @@ pub fn serve<T: BeaconChainTypes>(
}, },
); );
// POST lighthouse/liveness
let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness"))
.and(warp::path::end())
.and(warp::body::json())
.and(chain_filter.clone())
.and_then(
|request_data: api_types::LivenessRequestData, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
// Ensure the request is for either the current, previous or next epoch.
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;
let prev_epoch = current_epoch.saturating_sub(Epoch::new(1));
let next_epoch = current_epoch.saturating_add(Epoch::new(1));
if request_data.epoch < prev_epoch || request_data.epoch > next_epoch {
return Err(warp_utils::reject::custom_bad_request(format!(
"request epoch {} is more than one epoch from the current epoch {}",
request_data.epoch, current_epoch
)));
}
let liveness: Vec<api_types::LivenessResponseData> = request_data
.indices
.iter()
.cloned()
.map(|index| {
let is_live =
chain.validator_seen_at_epoch(index as usize, request_data.epoch);
api_types::LivenessResponseData {
index: index as u64,
epoch: request_data.epoch,
is_live,
}
})
.collect();
Ok(api_types::GenericResponse::from(liveness))
})
},
);
// GET lighthouse/health // GET lighthouse/health
let get_lighthouse_health = warp::path("lighthouse") let get_lighthouse_health = warp::path("lighthouse")
.and(warp::path("health")) .and(warp::path("health"))
@ -2249,6 +2292,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_beacon_pool_voluntary_exits.boxed()) .or(post_beacon_pool_voluntary_exits.boxed())
.or(post_validator_duties_attester.boxed()) .or(post_validator_duties_attester.boxed())
.or(post_validator_aggregate_and_proofs.boxed()) .or(post_validator_aggregate_and_proofs.boxed())
.or(post_lighthouse_liveness.boxed())
.or(post_validator_beacon_committee_subscriptions.boxed()), .or(post_validator_beacon_committee_subscriptions.boxed()),
)) ))
.recover(warp_utils::reject::handle_rejection) .recover(warp_utils::reject::handle_rejection)

View File

@ -2149,6 +2149,71 @@ impl ApiTester {
self self
} }
pub async fn test_post_lighthouse_liveness(self) -> Self {
let epoch = self.chain.epoch().unwrap();
let head_state = self.chain.head_beacon_state().unwrap();
let indices = (0..head_state.validators().len())
.map(|i| i as u64)
.collect::<Vec<_>>();
// Construct the expected response
let expected: Vec<LivenessResponseData> = head_state
.validators()
.iter()
.enumerate()
.map(|(index, _)| LivenessResponseData {
index: index as u64,
is_live: false,
epoch,
})
.collect();
let result = self
.client
.post_lighthouse_liveness(indices.as_slice(), epoch)
.await
.unwrap()
.data;
assert_eq!(result, expected);
// Attest to the current slot
self.client
.post_beacon_pool_attestations(self.attestations.as_slice())
.await
.unwrap();
let result = self
.client
.post_lighthouse_liveness(indices.as_slice(), epoch)
.await
.unwrap()
.data;
let committees = head_state
.get_beacon_committees_at_slot(self.chain.slot().unwrap())
.unwrap();
let attesting_validators: Vec<usize> = committees
.into_iter()
.map(|committee| committee.committee.iter().cloned())
.flatten()
.collect();
// All attesters should now be considered live
let expected = expected
.into_iter()
.map(|mut a| {
if attesting_validators.contains(&(a.index as usize)) {
a.is_live = true;
}
a
})
.collect::<Vec<_>>();
assert_eq!(result, expected);
self
}
pub async fn test_get_events(self) -> Self { pub async fn test_get_events(self) -> Self {
// Subscribe to all events // Subscribe to all events
let topics = vec![ let topics = vec![
@ -2635,5 +2700,7 @@ async fn lighthouse_endpoints() {
.test_get_lighthouse_beacon_states_ssz() .test_get_lighthouse_beacon_states_ssz()
.await .await
.test_get_lighthouse_staking() .test_get_lighthouse_staking()
.await
.test_post_lighthouse_liveness()
.await; .await;
} }

View File

@ -329,3 +329,28 @@ curl -X GET "http://localhost:5052/lighthouse/beacon/states/0/ssz" | jq
``` ```
*Example omitted for brevity, the body simply contains SSZ bytes.* *Example omitted for brevity, the body simply contains SSZ bytes.*
### `/lighthouse/liveness`
POST request that checks if any of the given validators have attested in the given epoch. Returns a list
of objects, each including the validator index, epoch, and `is_live` status of a requested validator.
This endpoint is used in doppelganger detection, and will only provide accurate information for the
current, previous, or next epoch.
```bash
curl -X POST "http://localhost:5052/lighthouse/liveness" -d '{"indices":["0","1"],"epoch":"1"}' -H "content-type: application/json" | jq
```
```json
{
"data": [
{
"index": "0",
"epoch": "1",
"is_live": true
}
]
}
```

View File

@ -83,6 +83,7 @@ impl fmt::Display for Error {
pub struct Timeouts { pub struct Timeouts {
pub attestation: Duration, pub attestation: Duration,
pub attester_duties: Duration, pub attester_duties: Duration,
pub liveness: Duration,
pub proposal: Duration, pub proposal: Duration,
pub proposer_duties: Duration, pub proposer_duties: Duration,
} }
@ -92,6 +93,7 @@ impl Timeouts {
Timeouts { Timeouts {
attestation: timeout, attestation: timeout,
attester_duties: timeout, attester_duties: timeout,
liveness: timeout,
proposal: timeout, proposal: timeout,
proposer_duties: timeout, proposer_duties: timeout,
} }
@ -1103,6 +1105,30 @@ impl BeaconNodeHttpClient {
.await .await
} }
/// `POST lighthouse/liveness`
pub async fn post_lighthouse_liveness(
&self,
ids: &[u64],
epoch: Epoch,
) -> Result<GenericResponse<Vec<LivenessResponseData>>, Error> {
let mut path = self.server.full.clone();
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("liveness");
self.post_with_timeout_and_response(
path,
&LivenessRequestData {
indices: ids.to_vec(),
epoch,
},
self.timeouts.liveness,
)
.await
}
/// `POST validator/duties/attester/{epoch}` /// `POST validator/duties/attester/{epoch}`
pub async fn post_validator_duties_attester( pub async fn post_validator_duties_attester(
&self, &self,

View File

@ -847,6 +847,21 @@ impl FromStr for Accept {
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct LivenessRequestData {
pub epoch: Epoch,
#[serde(with = "serde_utils::quoted_u64_vec")]
pub indices: Vec<u64>,
}
#[derive(PartialEq, Debug, Serialize, Deserialize)]
pub struct LivenessResponseData {
#[serde(with = "serde_utils::quoted_u64")]
pub index: u64,
pub epoch: Epoch,
pub is_live: bool,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -344,6 +344,13 @@ fn main() {
non-default.", non-default.",
), ),
) )
.arg(
Arg::with_name("seconds-per-slot")
.long("seconds-per-slot")
.value_name("SECONDS")
.takes_value(true)
.help("Eth2 slot time"),
)
.arg( .arg(
Arg::with_name("seconds-per-eth1-block") Arg::with_name("seconds-per-eth1-block")
.long("seconds-per-eth1-block") .long("seconds-per-eth1-block")

View File

@ -43,6 +43,7 @@ pub fn run<T: EthSpec>(testnet_dir_path: PathBuf, matches: &ArgMatches) -> Resul
maybe_update!("genesis-delay", genesis_delay); maybe_update!("genesis-delay", genesis_delay);
maybe_update!("eth1-id", deposit_chain_id); maybe_update!("eth1-id", deposit_chain_id);
maybe_update!("eth1-id", deposit_network_id); maybe_update!("eth1-id", deposit_network_id);
maybe_update!("seconds-per-slot", seconds_per_slot);
maybe_update!("seconds-per-eth1-block", seconds_per_eth1_block); maybe_update!("seconds-per-eth1-block", seconds_per_eth1_block);
if let Some(v) = parse_ssz_optional(matches, "genesis-fork-version")? { if let Some(v) = parse_ssz_optional(matches, "genesis-fork-version")? {

View File

@ -378,3 +378,16 @@ pub fn malloc_tuning_flag() {
// effects of it. // effects of it.
.run(); .run();
} }
#[test]
fn doppelganger_protection_flag() {
CommandLineTest::new()
.flag("enable-doppelganger-protection", None)
.run()
.with_config(|config| assert!(config.enable_doppelganger_protection));
}
#[test]
fn no_doppelganger_protection_flag() {
CommandLineTest::new()
.run()
.with_config(|config| assert!(!config.enable_doppelganger_protection));
}

View File

@ -2,12 +2,12 @@
source ./vars.env source ./vars.env
ganache-cli \ exec ganache-cli \
--defaultBalanceEther 1000000000 \ --defaultBalanceEther 1000000000 \
--gasLimit 1000000000 \ --gasLimit 1000000000 \
--accounts 10 \ --accounts 10 \
--mnemonic "$ETH1_NETWORK_MNEMONIC" \ --mnemonic "$ETH1_NETWORK_MNEMONIC" \
--port 8545 \ --port 8545 \
--blockTime 3 \ --blockTime $SECONDS_PER_ETH1_BLOCK \
--networkId "$NETWORK_ID" \ --networkId "$NETWORK_ID" \
--chainId "$NETWORK_ID" --chainId "$NETWORK_ID"

View File

@ -33,7 +33,8 @@ lcli \
--altair-fork-epoch $ALTAIR_FORK_EPOCH \ --altair-fork-epoch $ALTAIR_FORK_EPOCH \
--eth1-id $NETWORK_ID \ --eth1-id $NETWORK_ID \
--eth1-follow-distance 1 \ --eth1-follow-distance 1 \
--seconds-per-eth1-block 1 \ --seconds-per-slot $SECONDS_PER_SLOT \
--seconds-per-eth1-block $SECONDS_PER_ETH1_BLOCK \
--force --force
echo Specification generated at $TESTNET_DIR. echo Specification generated at $TESTNET_DIR.

View File

@ -16,4 +16,5 @@ exec lighthouse \
--datadir $1 \ --datadir $1 \
--testnet-dir $TESTNET_DIR \ --testnet-dir $TESTNET_DIR \
--init-slashing-protection \ --init-slashing-protection \
--beacon-nodes $2 --beacon-nodes $2 \
$VC_ARGS

View File

@ -28,3 +28,9 @@ NETWORK_ID=4242
# Hard fork configuration # Hard fork configuration
ALTAIR_FORK_EPOCH=18446744073709551615 ALTAIR_FORK_EPOCH=18446744073709551615
# Seconds per Eth2 slot
SECONDS_PER_SLOT=3
# Seconds per Eth1 block
SECONDS_PER_ETH1_BLOCK=1

View File

@ -0,0 +1,141 @@
#!/usr/bin/env bash
# Requires `lighthouse`, ``lcli`, `ganache-cli`, `curl`, `jq`
BEHAVIOR=$1
if [[ "$BEHAVIOR" != "success" ]] && [[ "$BEHAVIOR" != "failure" ]]; then
echo "Usage: doppelganger_protection.sh [success|failure]"
exit 1
fi
source ./vars.env
../local_testnet/clean.sh
echo "Starting ganache"
../local_testnet/ganache_test_node.sh &> /dev/null &
GANACHE_PID=$!
# Wait for ganache to start
sleep 5
echo "Setting up local testnet"
../local_testnet/setup.sh
# Duplicate this directory so slashing protection doesn't keep us from re-using validator keys
cp -R $HOME/.lighthouse/local-testnet/node_1 $HOME/.lighthouse/local-testnet/node_1_doppelganger
echo "Starting bootnode"
../local_testnet/bootnode.sh &> /dev/null &
BOOT_PID=$!
# wait for the bootnode to start
sleep 10
echo "Starting local beacon nodes"
../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_1 9000 8000 &> /dev/null &
BEACON_PID=$!
../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_2 9100 8100 &> /dev/null &
BEACON_PID2=$!
../local_testnet/beacon_node.sh $HOME/.lighthouse/local-testnet/node_3 9200 8200 &> /dev/null &
BEACON_PID3=$!
echo "Starting local validator clients"
../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1 http://localhost:8000 &> /dev/null &
VALIDATOR_1_PID=$!
../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_2 http://localhost:8100 &> /dev/null &
VALIDATOR_2_PID=$!
../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_3 http://localhost:8200 &> /dev/null &
VALIDATOR_3_PID=$!
echo "Waiting an epoch before starting the next validator client"
sleep $(( $SECONDS_PER_SLOT * 32 ))
if [[ "$BEHAVIOR" == "failure" ]]; then
echo "Starting the doppelganger validator client"
# Use same keys as keys from VC1, but connect to BN2
# This process should not last longer than 2 epochs
timeout $(( $SECONDS_PER_SLOT * 32 * 2 )) ../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_1_doppelganger http://localhost:8100
DOPPELGANGER_EXIT=$?
echo "Shutting down"
# Cleanup
kill $BOOT_PID $BEACON_PID $BEACON_PID2 $BEACON_PID3 $GANACHE_PID $VALIDATOR_1_PID $VALIDATOR_2_PID $VALIDATOR_3_PID
echo "Done"
if [[ $DOPPELGANGER_EXIT -eq 124 ]]; then
exit 1
fi
fi
if [[ "$BEHAVIOR" == "success" ]]; then
echo "Starting the last validator client"
../local_testnet/validator_client.sh $HOME/.lighthouse/local-testnet/node_4 http://localhost:8100 &
VALIDATOR_4_PID=$!
DOPPELGANGER_FAILURE=0
# Sleep three epochs, then make sure all validators were active in epoch 2. Use
# `is_previous_epoch_target_attester` from epoch 3 for a complete view of epoch 2 inclusion.
#
# See: https://lighthouse-book.sigmaprime.io/validator-inclusion.html
echo "Waiting three epochs..."
sleep $(( $SECONDS_PER_SLOT * 32 * 3 ))
PREVIOUS_DIR=$(pwd)
cd $HOME/.lighthouse/local-testnet/node_4/validators
for val in 0x*; do
[[ -e $val ]] || continue
curl -s localhost:8100/lighthouse/validator_inclusion/3/$val | jq | grep -q '"is_previous_epoch_target_attester": false'
IS_ATTESTER=$?
if [[ $IS_ATTESTER -eq 0 ]]; then
echo "$val did not attest in epoch 2."
else
echo "ERROR! $val did attest in epoch 2."
DOPPELGANGER_FAILURE=1
fi
done
# Sleep two epochs, then make sure all validators were active in epoch 4. Use
# `is_previous_epoch_target_attester` from epoch 5 for a complete view of epoch 4 inclusion.
#
# See: https://lighthouse-book.sigmaprime.io/validator-inclusion.html
echo "Waiting two more epochs..."
sleep $(( $SECONDS_PER_SLOT * 32 * 2 ))
for val in 0x*; do
[[ -e $val ]] || continue
curl -s localhost:8100/lighthouse/validator_inclusion/5/$val | jq | grep -q '"is_previous_epoch_target_attester": true'
IS_ATTESTER=$?
if [[ $IS_ATTESTER -eq 0 ]]; then
echo "$val attested in epoch 4."
else
echo "ERROR! $val did not attest in epoch 4."
DOPPELGANGER_FAILURE=1
fi
done
echo "Shutting down"
# Cleanup
cd $PREVIOUS_DIR
kill $BOOT_PID $BEACON_PID $BEACON_PID2 $BEACON_PID3 $GANACHE_PID $VALIDATOR_1_PID $VALIDATOR_2_PID $VALIDATOR_3_PID $VALIDATOR_4_PID
echo "Done"
if [[ $DOPPELGANGER_FAILURE -eq 1 ]]; then
exit 1
fi
fi
exit 0

39
scripts/tests/vars.env Normal file
View File

@ -0,0 +1,39 @@
# Base directories for the validator keys and secrets
DATADIR=~/.lighthouse/local-testnet
# Directory for the eth2 config
TESTNET_DIR=$DATADIR/testnet
# Mnemonic for the ganache test network
ETH1_NETWORK_MNEMONIC="vast thought differ pull jewel broom cook wrist tribe word before omit"
# Hardcoded deposit contract based on ETH1_NETWORK_MNEMONIC
DEPOSIT_CONTRACT_ADDRESS=8c594691c0e592ffa21f153a16ae41db5befcaaa
GENESIS_FORK_VERSION=0x42424242
VALIDATOR_COUNT=80
GENESIS_VALIDATOR_COUNT=80
# Number of validator client instances that you intend to run
NODE_COUNT=4
GENESIS_DELAY=0
# Port for P2P communication with bootnode
BOOTNODE_PORT=4242
# Network ID and Chain ID of local eth1 test network
NETWORK_ID=4242
# Hard fork configuration
ALTAIR_FORK_EPOCH=18446744073709551615
# Seconds per Eth2 slot
SECONDS_PER_SLOT=3
# Seconds per Eth1 block
SECONDS_PER_ETH1_BLOCK=1
# Enable doppelganger detection
VC_ARGS=" --enable-doppelganger-protection "

View File

@ -125,7 +125,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
network.clone(), network.clone(),
Epoch::new(4).start_slot(MainnetEthSpec::slots_per_epoch()), Epoch::new(4).start_slot(MainnetEthSpec::slots_per_epoch()),
slot_duration, slot_duration,
) ),
); );
finalization?; finalization?;
block_prod?; block_prod?;

View File

@ -66,3 +66,4 @@ lazy_static = "1.4.0"
fallback = { path = "../common/fallback" } fallback = { path = "../common/fallback" }
monitoring_api = { path = "../common/monitoring_api" } monitoring_api = { path = "../common/monitoring_api" }
sensitive_url = { path = "../common/sensitive_url" } sensitive_url = { path = "../common/sensitive_url" }
task_executor = { path = "../common/task_executor" }

View File

@ -20,7 +20,7 @@ use types::{
/// Builds an `AttestationService`. /// Builds an `AttestationService`.
pub struct AttestationServiceBuilder<T, E: EthSpec> { pub struct AttestationServiceBuilder<T, E: EthSpec> {
duties_service: Option<Arc<DutiesService<T, E>>>, duties_service: Option<Arc<DutiesService<T, E>>>,
validator_store: Option<ValidatorStore<T, E>>, validator_store: Option<Arc<ValidatorStore<T, E>>>,
slot_clock: Option<T>, slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>, beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>, context: Option<RuntimeContext<E>>,
@ -42,7 +42,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
self self
} }
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self { pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
self.validator_store = Some(store); self.validator_store = Some(store);
self self
} }
@ -88,7 +88,7 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationServiceBuilder<T, E> {
/// Helper to minimise `Arc` usage. /// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> { pub struct Inner<T, E: EthSpec> {
duties_service: Arc<DutiesService<T, E>>, duties_service: Arc<DutiesService<T, E>>,
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: T, slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>, beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>, context: RuntimeContext<E>,
@ -377,25 +377,22 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
signature: AggregateSignature::infinity(), signature: AggregateSignature::infinity(),
}; };
if self if let Err(e) = self.validator_store.sign_attestation(
.validator_store duty.pubkey,
.sign_attestation( duty.validator_committee_index as usize,
&duty.pubkey, &mut attestation,
duty.validator_committee_index as usize, current_epoch,
&mut attestation, ) {
current_epoch,
)
.is_some()
{
attestations.push(attestation);
} else {
crit!( crit!(
log, log,
"Failed to sign attestation"; "Failed to sign attestation";
"error" => ?e,
"committee_index" => committee_index, "committee_index" => committee_index,
"slot" => slot.as_u64(), "slot" => slot.as_u64(),
); );
continue; continue;
} else {
attestations.push(attestation);
} }
} }
@ -497,17 +494,22 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
continue; continue;
} }
if let Some(aggregate) = self.validator_store.produce_signed_aggregate_and_proof( match self.validator_store.produce_signed_aggregate_and_proof(
&duty.pubkey, duty.pubkey,
duty.validator_index, duty.validator_index,
aggregated_attestation.clone(), aggregated_attestation.clone(),
selection_proof.clone(), selection_proof.clone(),
) { ) {
signed_aggregate_and_proofs.push(aggregate); Ok(aggregate) => signed_aggregate_and_proofs.push(aggregate),
} else { Err(e) => {
crit!(log, "Failed to sign attestation"); crit!(
continue; log,
}; "Failed to sign attestation";
"error" => ?e
);
continue;
}
}
} }
if !signed_aggregate_and_proofs.is_empty() { if !signed_aggregate_and_proofs.is_empty() {

View File

@ -5,7 +5,6 @@ use crate::{
use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use crate::{http_metrics::metrics, validator_store::ValidatorStore};
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::Graffiti; use eth2::types::Graffiti;
use futures::TryFutureExt;
use slog::{crit, debug, error, info, trace, warn}; use slog::{crit, debug, error, info, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::ops::Deref; use std::ops::Deref;
@ -15,7 +14,7 @@ use types::{EthSpec, PublicKeyBytes, Slot};
/// Builds a `BlockService`. /// Builds a `BlockService`.
pub struct BlockServiceBuilder<T, E: EthSpec> { pub struct BlockServiceBuilder<T, E: EthSpec> {
validator_store: Option<ValidatorStore<T, E>>, validator_store: Option<Arc<ValidatorStore<T, E>>>,
slot_clock: Option<Arc<T>>, slot_clock: Option<Arc<T>>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>, beacon_nodes: Option<Arc<BeaconNodeFallback<T, E>>>,
context: Option<RuntimeContext<E>>, context: Option<RuntimeContext<E>>,
@ -35,7 +34,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
} }
} }
pub fn validator_store(mut self, store: ValidatorStore<T, E>) -> Self { pub fn validator_store(mut self, store: Arc<ValidatorStore<T, E>>) -> Self {
self.validator_store = Some(store); self.validator_store = Some(store);
self self
} }
@ -89,7 +88,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockServiceBuilder<T, E> {
/// Helper to minimise `Arc` usage. /// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> { pub struct Inner<T, E: EthSpec> {
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
slot_clock: Arc<T>, slot_clock: Arc<T>,
beacon_nodes: Arc<BeaconNodeFallback<T, E>>, beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
context: RuntimeContext<E>, context: RuntimeContext<E>,
@ -207,15 +206,15 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let service = self.clone(); let service = self.clone();
let log = log.clone(); let log = log.clone();
self.inner.context.executor.spawn( self.inner.context.executor.spawn(
service async move {
.publish_block(slot, validator_pubkey) if let Err(e) = service.publish_block(slot, validator_pubkey).await {
.unwrap_or_else(move |e| {
crit!( crit!(
log, log,
"Error whilst producing block"; "Error whilst producing block";
"message" => e "message" => e
); );
}), }
},
"block service", "block service",
); );
} }
@ -240,8 +239,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let randao_reveal = self let randao_reveal = self
.validator_store .validator_store
.randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) .randao_reveal(validator_pubkey, slot.epoch(E::slots_per_epoch()))
.ok_or("Unable to produce randao reveal")? .map_err(|e| format!("Unable to produce randao reveal signature: {:?}", e))?
.into(); .into();
let graffiti = self let graffiti = self
@ -276,8 +275,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
let signed_block = self_ref let signed_block = self_ref
.validator_store .validator_store
.sign_block(validator_pubkey_ref, block, current_slot) .sign_block(*validator_pubkey_ref, block, current_slot)
.ok_or("Unable to sign block")?; .map_err(|e| format!("Unable to sign block: {:?}", e))?;
let _post_timer = metrics::start_timer_vec( let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES, &metrics::BLOCK_SERVICE_TIMES,

View File

@ -216,4 +216,19 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
and never provide an untrusted URL.") and never provide an untrusted URL.")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("enable-doppelganger-protection")
.long("enable-doppelganger-protection")
.value_name("ENABLE_DOPPELGANGER_PROTECTION")
.help("If this flag is set, Lighthouse will delay startup for three epochs and \
monitor for messages on the network by any of the validators managed by this \
client. This will result in three (possibly four) epochs worth of missed \
attestations. If an attestation is detected during this period, it means it is \
very likely that you are running a second validator client with the same keys. \
This validator client will immediately shutdown if this is detected in order \
to avoid potentially committing a slashable offense. Use this flag in order to \
ENABLE this functionality, without this flag Lighthouse will begin attesting \
immediately.")
.takes_value(false),
)
} }

View File

@ -47,6 +47,9 @@ pub struct Config {
pub http_metrics: http_metrics::Config, pub http_metrics: http_metrics::Config,
/// Configuration for sending metrics to a remote explorer endpoint. /// Configuration for sending metrics to a remote explorer endpoint.
pub monitoring_api: Option<monitoring_api::Config>, pub monitoring_api: Option<monitoring_api::Config>,
/// If true, enable functionality that monitors the network for attestations or proposals from
/// any of the validators managed by this client before starting up.
pub enable_doppelganger_protection: bool,
} }
impl Default for Config { impl Default for Config {
@ -76,6 +79,7 @@ impl Default for Config {
http_api: <_>::default(), http_api: <_>::default(),
http_metrics: <_>::default(), http_metrics: <_>::default(),
monitoring_api: None, monitoring_api: None,
enable_doppelganger_protection: false,
} }
} }
} }
@ -264,6 +268,10 @@ impl Config {
}); });
} }
if cli_args.is_present("enable-doppelganger-protection") {
config.enable_doppelganger_protection = true;
}
Ok(config) Ok(config)
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,9 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::{ use crate::{
block_service::BlockServiceNotification, http_metrics::metrics, validator_store::ValidatorStore, block_service::BlockServiceNotification,
http_metrics::metrics,
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
}; };
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
@ -36,7 +38,7 @@ const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
pub enum Error { pub enum Error {
UnableToReadSlotClock, UnableToReadSlotClock,
FailedToDownloadAttesters(String), FailedToDownloadAttesters(String),
FailedToProduceSelectionProof, FailedToProduceSelectionProof(ValidatorStoreError),
InvalidModulo(ArithError), InvalidModulo(ArithError),
} }
@ -56,8 +58,8 @@ impl DutyAndProof {
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let selection_proof = validator_store let selection_proof = validator_store
.produce_selection_proof(&duty.pubkey, duty.slot) .produce_selection_proof(duty.pubkey, duty.slot)
.ok_or(Error::FailedToProduceSelectionProof)?; .map_err(Error::FailedToProduceSelectionProof)?;
let selection_proof = selection_proof let selection_proof = selection_proof
.is_aggregator(duty.committee_length as usize, spec) .is_aggregator(duty.committee_length as usize, spec)
@ -84,7 +86,6 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>; type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>; type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
type IndicesMap = HashMap<PublicKeyBytes, u64>;
/// See the module-level documentation. /// See the module-level documentation.
pub struct DutiesService<T, E: EthSpec> { pub struct DutiesService<T, E: EthSpec> {
@ -93,11 +94,8 @@ pub struct DutiesService<T, E: EthSpec> {
/// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain /// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain
/// proposals for any validators which are not registered locally. /// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>, pub proposers: RwLock<ProposerMap>,
/// Maps a public key to a validator index. There is a task which ensures this map is kept
/// up-to-date.
pub indices: RwLock<IndicesMap>,
/// Provides the canonical list of locally-managed validators. /// Provides the canonical list of locally-managed validators.
pub validator_store: ValidatorStore<T, E>, pub validator_store: Arc<ValidatorStore<T, E>>,
/// Tracks the current slot. /// Tracks the current slot.
pub slot_clock: T, pub slot_clock: T,
/// Provides HTTP access to remote beacon nodes. /// Provides HTTP access to remote beacon nodes.
@ -119,21 +117,44 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
/// Returns the total number of validators that should propose in the given epoch. /// Returns the total number of validators that should propose in the given epoch.
pub fn proposer_count(&self, epoch: Epoch) -> usize { pub fn proposer_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
self.proposers self.proposers
.read() .read()
.get(&epoch) .get(&epoch)
.map_or(0, |(_, proposers)| proposers.len()) .map_or(0, |(_, proposers)| {
proposers
.iter()
.filter(|proposer_data| signing_pubkeys.contains(&proposer_data.pubkey))
.count()
})
} }
/// Returns the total number of validators that should attest in the given epoch. /// Returns the total number of validators that should attest in the given epoch.
pub fn attester_count(&self, epoch: Epoch) -> usize { pub fn attester_count(&self, epoch: Epoch) -> usize {
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
self.attesters self.attesters
.read() .read()
.iter() .iter()
.filter(|(_, map)| map.contains_key(&epoch)) .filter_map(|(_, map)| map.get(&epoch))
.map(|(_, duty_and_proof)| duty_and_proof)
.filter(|duty_and_proof| signing_pubkeys.contains(&duty_and_proof.duty.pubkey))
.count() .count()
} }
/// Returns the total number of validators that are in a doppelganger detection period.
pub fn doppelganger_detecting_count(&self) -> usize {
self.validator_store
.voting_pubkeys::<HashSet<_>, _>(DoppelgangerStatus::only_unsafe)
.len()
}
/// Returns the pubkeys of the validators which are assigned to propose in the given slot. /// Returns the pubkeys of the validators which are assigned to propose in the given slot.
/// ///
/// It is possible that multiple validators have an identical proposal slot, however that is /// It is possible that multiple validators have an identical proposal slot, however that is
@ -141,13 +162,21 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> { pub fn block_proposers(&self, slot: Slot) -> HashSet<PublicKeyBytes> {
let epoch = slot.epoch(E::slots_per_epoch()); let epoch = slot.epoch(E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
self.proposers self.proposers
.read() .read()
.get(&epoch) .get(&epoch)
.map(|(_, proposers)| { .map(|(_, proposers)| {
proposers proposers
.iter() .iter()
.filter(|proposer_data| proposer_data.slot == slot) .filter(|proposer_data| {
proposer_data.slot == slot
&& signing_pubkeys.contains(&proposer_data.pubkey)
})
.map(|proposer_data| proposer_data.pubkey) .map(|proposer_data| proposer_data.pubkey)
.collect() .collect()
}) })
@ -158,12 +187,20 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> { pub fn attesters(&self, slot: Slot) -> Vec<DutyAndProof> {
let epoch = slot.epoch(E::slots_per_epoch()); let epoch = slot.epoch(E::slots_per_epoch());
// Only collect validators that are considered safe in terms of doppelganger protection.
let signing_pubkeys: HashSet<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::only_safe);
self.attesters self.attesters
.read() .read()
.iter() .iter()
.filter_map(|(_, map)| map.get(&epoch)) .filter_map(|(_, map)| map.get(&epoch))
.map(|(_, duty_and_proof)| duty_and_proof) .map(|(_, duty_and_proof)| duty_and_proof)
.filter(|duty_and_proof| duty_and_proof.duty.slot == slot) .filter(|duty_and_proof| {
duty_and_proof.duty.slot == slot
&& signing_pubkeys.contains(&duty_and_proof.duty.pubkey)
})
.cloned() .cloned()
.collect() .collect()
} }
@ -276,9 +313,23 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::UPDATE_INDICES]); metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::UPDATE_INDICES]);
let log = duties_service.context.log(); let log = duties_service.context.log();
for pubkey in duties_service.validator_store.voting_pubkeys() {
// Collect *all* pubkeys for resolving indices, even those undergoing doppelganger protection.
//
// Since doppelganger protection queries rely on validator indices it is important to ensure we
// collect those indices.
let all_pubkeys: Vec<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
for pubkey in all_pubkeys {
// This is on its own line to avoid some weirdness with locks and if statements. // This is on its own line to avoid some weirdness with locks and if statements.
let is_known = duties_service.indices.read().contains_key(&pubkey); let is_known = duties_service
.validator_store
.initialized_validators()
.read()
.get_index(&pubkey)
.is_some();
if !is_known { if !is_known {
// Query the remote BN to resolve a pubkey to a validator index. // Query the remote BN to resolve a pubkey to a validator index.
@ -307,9 +358,10 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
"validator_index" => response.data.index "validator_index" => response.data.index
); );
duties_service duties_service
.indices .validator_store
.initialized_validators()
.write() .write()
.insert(pubkey, response.data.index); .set_index(&pubkey, response.data.index);
} }
// This is not necessarily an error, it just means the validator is not yet known to // This is not necessarily an error, it just means the validator is not yet known to
// the beacon chain. // the beacon chain.
@ -359,18 +411,22 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let current_epoch = current_slot.epoch(E::slots_per_epoch()); let current_epoch = current_slot.epoch(E::slots_per_epoch());
let next_epoch = current_epoch + 1; let next_epoch = current_epoch + 1;
let local_pubkeys: HashSet<PublicKeyBytes> = duties_service // Collect *all* pubkeys, even those undergoing doppelganger protection.
//
// We must know the duties for doppelganger validators so that we can subscribe to their subnets
// and get more information about other running instances.
let local_pubkeys: HashSet<_> = duties_service
.validator_store .validator_store
.voting_pubkeys() .voting_pubkeys(DoppelgangerStatus::ignored);
.into_iter()
.collect();
let local_indices = { let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len()); let mut local_indices = Vec::with_capacity(local_pubkeys.len());
let indices_map = duties_service.indices.read();
let vals_ref = duties_service.validator_store.initialized_validators();
let vals = vals_ref.read();
for &pubkey in &local_pubkeys { for &pubkey in &local_pubkeys {
if let Some(validator_index) = indices_map.get(&pubkey) { if let Some(validator_index) = vals.get_index(&pubkey) {
local_indices.push(*validator_index) local_indices.push(validator_index)
} }
} }
local_indices local_indices
@ -636,15 +692,18 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
current_slot, current_slot,
&initial_block_proposers, &initial_block_proposers,
block_service_tx, block_service_tx,
&duties_service.validator_store,
log, log,
) )
.await; .await;
let local_pubkeys: HashSet<PublicKeyBytes> = duties_service // Collect *all* pubkeys, even those undergoing doppelganger protection.
//
// It is useful to keep the duties for all validators around, so they're on hand when
// doppelganger finishes.
let local_pubkeys: HashSet<_> = duties_service
.validator_store .validator_store
.voting_pubkeys() .voting_pubkeys(DoppelgangerStatus::ignored);
.into_iter()
.collect();
// Only download duties and push out additional block production events if we have some // Only download duties and push out additional block production events if we have some
// validators. // validators.
@ -723,6 +782,7 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
current_slot, current_slot,
&additional_block_producers, &additional_block_producers,
block_service_tx, block_service_tx,
&duties_service.validator_store,
log, log,
) )
.await; .await;
@ -745,24 +805,33 @@ async fn poll_beacon_proposers<T: SlotClock + 'static, E: EthSpec>(
} }
/// Notify the block service if it should produce a block. /// Notify the block service if it should produce a block.
async fn notify_block_production_service( async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
current_slot: Slot, current_slot: Slot,
block_proposers: &HashSet<PublicKeyBytes>, block_proposers: &HashSet<PublicKeyBytes>,
block_service_tx: &mut Sender<BlockServiceNotification>, block_service_tx: &mut Sender<BlockServiceNotification>,
validator_store: &ValidatorStore<T, E>,
log: &Logger, log: &Logger,
) { ) {
if let Err(e) = block_service_tx let non_doppelganger_proposers = block_proposers
.send(BlockServiceNotification { .iter()
slot: current_slot, .filter(|pubkey| validator_store.doppelganger_protection_allows_signing(**pubkey))
block_proposers: block_proposers.iter().copied().collect(), .copied()
}) .collect::<Vec<_>>();
.await
{ if !non_doppelganger_proposers.is_empty() {
error!( if let Err(e) = block_service_tx
log, .send(BlockServiceNotification {
"Failed to notify block service"; slot: current_slot,
"current_slot" => current_slot, block_proposers: non_doppelganger_proposers,
"error" => %e })
); .await
}; {
error!(
log,
"Failed to notify block service";
"current_slot" => current_slot,
"error" => %e
);
};
}
} }

View File

@ -137,6 +137,11 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
*self.fork.read() *self.fork.read()
} }
/// Returns the slot clock.
pub fn slot_clock(&self) -> T {
self.slot_clock.clone()
}
/// Starts the service that periodically polls for the `Fork`. /// Starts the service that periodically polls for the `Fork`.
pub fn start_update_service(self, context: &RuntimeContext<E>) -> Result<(), String> { pub fn start_update_service(self, context: &RuntimeContext<E>) -> Result<(), String> {
// Run an immediate update before starting the updater service. // Run an immediate update before starting the updater service.

View File

@ -50,10 +50,10 @@ impl From<String> for Error {
/// A wrapper around all the items required to spawn the HTTP server. /// A wrapper around all the items required to spawn the HTTP server.
/// ///
/// The server will gracefully handle the case where any fields are `None`. /// The server will gracefully handle the case where any fields are `None`.
pub struct Context<T: Clone, E: EthSpec> { pub struct Context<T: SlotClock, E: EthSpec> {
pub runtime: Weak<Runtime>, pub runtime: Weak<Runtime>,
pub api_secret: ApiSecret, pub api_secret: ApiSecret,
pub validator_store: Option<ValidatorStore<T, E>>, pub validator_store: Option<Arc<ValidatorStore<T, E>>>,
pub validator_dir: Option<PathBuf>, pub validator_dir: Option<PathBuf>,
pub spec: ChainSpec, pub spec: ChainSpec,
pub config: Config, pub config: Config,
@ -203,7 +203,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end()) .and(warp::path::end())
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .and(signer.clone())
.and_then(|validator_store: ValidatorStore<T, E>, signer| { .and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || { blocking_signed_json_task(signer, move || {
let validators = validator_store let validators = validator_store
.initialized_validators() .initialized_validators()
@ -229,7 +229,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone()) .and(validator_store_filter.clone())
.and(signer.clone()) .and(signer.clone())
.and_then( .and_then(
|validator_pubkey: PublicKey, validator_store: ValidatorStore<T, E>, signer| { |validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || { blocking_signed_json_task(signer, move || {
let validator = validator_store let validator = validator_store
.initialized_validators() .initialized_validators()
@ -267,7 +267,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and_then( .and_then(
|body: Vec<api_types::ValidatorRequest>, |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf, validator_dir: PathBuf,
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
signer, signer,
runtime: Weak<Runtime>| { runtime: Weak<Runtime>| {
@ -309,7 +309,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and_then( .and_then(
|body: api_types::CreateValidatorsMnemonicRequest, |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf, validator_dir: PathBuf,
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
signer, signer,
runtime: Weak<Runtime>| { runtime: Weak<Runtime>| {
@ -353,7 +353,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and_then( .and_then(
|body: api_types::KeystoreValidatorsPostRequest, |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf, validator_dir: PathBuf,
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
signer, signer,
runtime: Weak<Runtime>| { runtime: Weak<Runtime>| {
blocking_signed_json_task(signer, move || { blocking_signed_json_task(signer, move || {
@ -428,7 +428,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and_then( .and_then(
|validator_pubkey: PublicKey, |validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest, body: api_types::ValidatorPatchRequest,
validator_store: ValidatorStore<T, E>, validator_store: Arc<ValidatorStore<T, E>>,
signer, signer,
runtime: Weak<Runtime>| { runtime: Weak<Runtime>| {
blocking_signed_json_task(signer, move || { blocking_signed_json_task(signer, move || {

View File

@ -1,6 +1,7 @@
#![cfg(test)] #![cfg(test)]
#![cfg(not(debug_assertions))] #![cfg(not(debug_assertions))]
use crate::doppelganger_service::DoppelgangerService;
use crate::{ use crate::{
http_api::{ApiSecret, Config as HttpConfig, Context}, http_api::{ApiSecret, Config as HttpConfig, Context},
Config, ForkServiceBuilder, InitializedValidators, ValidatorDefinitions, ValidatorStore, Config, ForkServiceBuilder, InitializedValidators, ValidatorDefinitions, ValidatorStore,
@ -85,16 +86,21 @@ impl ApiTester {
Hash256::repeat_byte(42), Hash256::repeat_byte(42),
spec, spec,
fork_service.clone(), fork_service.clone(),
Some(Arc::new(DoppelgangerService::new(log.clone()))),
log.clone(), log.clone(),
); );
validator_store
.register_all_in_doppelganger_protection_if_enabled()
.expect("Should attach doppelganger service");
let initialized_validators = validator_store.initialized_validators(); let initialized_validators = validator_store.initialized_validators();
let context: Arc<Context<TestingSlotClock, E>> = Arc::new(Context { let context: Arc<Context<TestingSlotClock, E>> = Arc::new(Context {
runtime, runtime,
api_secret, api_secret,
validator_dir: Some(validator_dir.path().into()), validator_dir: Some(validator_dir.path().into()),
validator_store: Some(validator_store), validator_store: Some(Arc::new(validator_store)),
spec: E::default_spec(), spec: E::default_spec(),
config: HttpConfig { config: HttpConfig {
enabled: true, enabled: true,

View File

@ -35,7 +35,7 @@ impl From<String> for Error {
/// Contains objects which have shared access from inside/outside of the metrics server. /// Contains objects which have shared access from inside/outside of the metrics server.
pub struct Shared<T: EthSpec> { pub struct Shared<T: EthSpec> {
pub validator_store: Option<ValidatorStore<SystemTimeSlotClock, T>>, pub validator_store: Option<Arc<ValidatorStore<SystemTimeSlotClock, T>>>,
pub duties_service: Option<Arc<DutiesService<SystemTimeSlotClock, T>>>, pub duties_service: Option<Arc<DutiesService<SystemTimeSlotClock, T>>>,
pub genesis_time: Option<u64>, pub genesis_time: Option<u64>,
} }

View File

@ -62,6 +62,10 @@ pub enum Error {
TokioJoin(tokio::task::JoinError), TokioJoin(tokio::task::JoinError),
/// Cannot initialize the same validator twice. /// Cannot initialize the same validator twice.
DuplicatePublicKey, DuplicatePublicKey,
/// The public key does not exist in the set of initialized validators.
ValidatorNotInitialized(PublicKey),
/// Unable to read the slot clock.
SlotClock,
} }
impl From<LockfileError> for Error { impl From<LockfileError> for Error {
@ -88,6 +92,8 @@ pub enum SigningMethod {
pub struct InitializedValidator { pub struct InitializedValidator {
signing_method: SigningMethod, signing_method: SigningMethod,
graffiti: Option<Graffiti>, graffiti: Option<Graffiti>,
/// The validators index in `state.validators`, to be updated by an external service.
index: Option<u64>,
} }
impl InitializedValidator { impl InitializedValidator {
@ -212,6 +218,7 @@ impl InitializedValidator {
voting_keypair, voting_keypair,
}, },
graffiti: def.graffiti.map(Into::into), graffiti: def.graffiti.map(Into::into),
index: None,
}) })
} }
} }
@ -313,7 +320,7 @@ impl InitializedValidators {
self.definitions.as_slice().len() self.definitions.as_slice().len()
} }
/// Iterate through all **enabled** voting public keys in `self`. /// Iterate through all voting public keys in `self` that should be used when querying for duties.
pub fn iter_voting_pubkeys(&self) -> impl Iterator<Item = &PublicKeyBytes> { pub fn iter_voting_pubkeys(&self) -> impl Iterator<Item = &PublicKeyBytes> {
self.validators.iter().map(|(pubkey, _)| pubkey) self.validators.iter().map(|(pubkey, _)| pubkey)
} }
@ -622,4 +629,14 @@ impl InitializedValidators {
); );
Ok(()) Ok(())
} }
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<u64> {
self.validators.get(pubkey).and_then(|val| val.index)
}
pub fn set_index(&mut self, pubkey: &PublicKeyBytes, index: u64) {
if let Some(val) = self.validators.get_mut(pubkey) {
val.index = Some(index);
}
}
} }

View File

@ -13,6 +13,7 @@ mod key_cache;
mod notifier; mod notifier;
mod validator_store; mod validator_store;
mod doppelganger_service;
pub mod http_api; pub mod http_api;
pub use cli::cli_app; pub use cli::cli_app;
@ -23,6 +24,7 @@ use monitoring_api::{MonitoringHttpClient, ProcessType};
use crate::beacon_node_fallback::{ use crate::beacon_node_fallback::{
start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced,
}; };
use crate::doppelganger_service::DoppelgangerService;
use account_utils::validator_definitions::ValidatorDefinitions; use account_utils::validator_definitions::ValidatorDefinitions;
use attestation_service::{AttestationService, AttestationServiceBuilder}; use attestation_service::{AttestationService, AttestationServiceBuilder};
use block_service::{BlockService, BlockServiceBuilder}; use block_service::{BlockService, BlockServiceBuilder};
@ -61,9 +63,12 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
/// This can help ensure that proper endpoint fallback occurs. /// This can help ensure that proper endpoint fallback occurs.
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
#[derive(Clone)] #[derive(Clone)]
pub struct ProductionValidatorClient<T: EthSpec> { pub struct ProductionValidatorClient<T: EthSpec> {
context: RuntimeContext<T>, context: RuntimeContext<T>,
@ -71,7 +76,8 @@ pub struct ProductionValidatorClient<T: EthSpec> {
fork_service: ForkService<SystemTimeSlotClock, T>, fork_service: ForkService<SystemTimeSlotClock, T>,
block_service: BlockService<SystemTimeSlotClock, T>, block_service: BlockService<SystemTimeSlotClock, T>,
attestation_service: AttestationService<SystemTimeSlotClock, T>, attestation_service: AttestationService<SystemTimeSlotClock, T>,
validator_store: ValidatorStore<SystemTimeSlotClock, T>, doppelganger_service: Option<Arc<DoppelgangerService>>,
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, T>>,
http_api_listen_addr: Option<SocketAddr>, http_api_listen_addr: Option<SocketAddr>,
http_metrics_ctx: Option<Arc<http_metrics::Context<T>>>, http_metrics_ctx: Option<Arc<http_metrics::Context<T>>>,
config: Config, config: Config,
@ -254,6 +260,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
Timeouts { Timeouts {
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT, attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
} }
@ -313,14 +320,27 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.log(log.clone()) .log(log.clone())
.build()?; .build()?;
let validator_store: ValidatorStore<SystemTimeSlotClock, T> = ValidatorStore::new( let doppelganger_service = if config.enable_doppelganger_protection {
validators, Some(Arc::new(DoppelgangerService::new(
slashing_protection, context
genesis_validators_root, .service_context(DOPPELGANGER_SERVICE_NAME.into())
context.eth2_config.spec.clone(), .log()
fork_service.clone(), .clone(),
log.clone(), )))
); } else {
None
};
let validator_store: Arc<ValidatorStore<SystemTimeSlotClock, T>> =
Arc::new(ValidatorStore::new(
validators,
slashing_protection,
genesis_validators_root,
context.eth2_config.spec.clone(),
fork_service.clone(),
doppelganger_service.clone(),
log.clone(),
));
info!( info!(
log, log,
@ -339,7 +359,6 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let duties_service = Arc::new(DutiesService { let duties_service = Arc::new(DutiesService {
attesters: <_>::default(), attesters: <_>::default(),
proposers: <_>::default(), proposers: <_>::default(),
indices: <_>::default(),
slot_clock: slot_clock.clone(), slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(), beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(), validator_store: validator_store.clone(),
@ -369,7 +388,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let attestation_service = AttestationServiceBuilder::new() let attestation_service = AttestationServiceBuilder::new()
.duties_service(duties_service.clone()) .duties_service(duties_service.clone())
.slot_clock(slot_clock) .slot_clock(slot_clock.clone())
.validator_store(validator_store.clone()) .validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone()) .beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("attestation".into())) .runtime_context(context.service_context("attestation".into()))
@ -381,12 +400,16 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
// of making too many changes this close to genesis (<1 week). // of making too many changes this close to genesis (<1 week).
wait_for_genesis(&beacon_nodes, genesis_time, &context).await?; wait_for_genesis(&beacon_nodes, genesis_time, &context).await?;
// Ensure all validators are registered in doppelganger protection.
validator_store.register_all_in_doppelganger_protection_if_enabled()?;
Ok(Self { Ok(Self {
context, context,
duties_service, duties_service,
fork_service, fork_service,
block_service, block_service,
attestation_service, attestation_service,
doppelganger_service,
validator_store, validator_store,
config, config,
http_api_listen_addr: None, http_api_listen_addr: None,
@ -419,6 +442,20 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
.start_update_service(&self.context.eth2_config.spec) .start_update_service(&self.context.eth2_config.spec)
.map_err(|e| format!("Unable to start attestation service: {}", e))?; .map_err(|e| format!("Unable to start attestation service: {}", e))?;
if let Some(doppelganger_service) = self.doppelganger_service.clone() {
DoppelgangerService::start_update_service(
doppelganger_service,
self.context
.service_context(DOPPELGANGER_SERVICE_NAME.into()),
self.validator_store.clone(),
self.duties_service.beacon_nodes.clone(),
self.duties_service.slot_clock.clone(),
)
.map_err(|e| format!("Unable to start doppelganger service: {}", e))?
} else {
info!(log, "Doppelganger protection disabled.")
}
spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?;
let api_secret = ApiSecret::create_or_open(&self.config.validator_dir)?; let api_secret = ApiSecret::create_or_open(&self.config.validator_dir)?;

View File

@ -72,6 +72,11 @@ async fn notify<T: SlotClock + 'static, E: EthSpec>(
let total_validators = duties_service.total_validator_count(); let total_validators = duties_service.total_validator_count();
let proposing_validators = duties_service.proposer_count(epoch); let proposing_validators = duties_service.proposer_count(epoch);
let attesting_validators = duties_service.attester_count(epoch); let attesting_validators = duties_service.attester_count(epoch);
let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count();
if doppelganger_detecting_validators > 0 {
info!(log, "Searching for doppelgangers on the network"; "doppelganger_detecting_validators" => doppelganger_detecting_validators)
}
if total_validators == 0 { if total_validators == 0 {
info!( info!(

View File

@ -1,21 +1,36 @@
use crate::{ use crate::{
fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators, doppelganger_service::DoppelgangerService, fork_service::ForkService, http_metrics::metrics,
initialized_validators::InitializedValidators,
}; };
use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use slashing_protection::{NotSafe, Safe, SlashingDatabase}; use slashing_protection::{NotSafe, Safe, SlashingDatabase};
use slog::{crit, error, info, warn, Logger}; use slog::{crit, error, info, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::iter::FromIterator;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tempfile::TempDir;
use types::{ use types::{
graffiti::GraffitiString, Attestation, BeaconBlock, ChainSpec, Domain, Epoch, EthSpec, Fork, attestation::Error as AttestationError, graffiti::GraffitiString, Attestation, BeaconBlock,
Graffiti, Hash256, Keypair, PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, ChainSpec, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, Keypair, PublicKeyBytes,
SignedBeaconBlock, SignedRoot, Slot, SelectionProof, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedRoot, Slot,
}; };
use validator_dir::ValidatorDir; use validator_dir::ValidatorDir;
pub use crate::doppelganger_service::DoppelgangerStatus;
#[derive(Debug, PartialEq)]
pub enum Error {
DoppelgangerProtected(PublicKeyBytes),
UnknownToDoppelgangerService(PublicKeyBytes),
UnknownPubkey(PublicKeyBytes),
Slashable(NotSafe),
SameData,
GreaterThanCurrentSlot { slot: Slot, current_slot: Slot },
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
UnableToSignAttestation(AttestationError),
}
/// Number of epochs of slashing protection history to keep. /// Number of epochs of slashing protection history to keep.
/// ///
/// This acts as a maximum safe-guard against clock drift. /// This acts as a maximum safe-guard against clock drift.
@ -46,7 +61,6 @@ impl PartialEq for LocalValidator {
} }
} }
#[derive(Clone)]
pub struct ValidatorStore<T, E: EthSpec> { pub struct ValidatorStore<T, E: EthSpec> {
validators: Arc<RwLock<InitializedValidators>>, validators: Arc<RwLock<InitializedValidators>>,
slashing_protection: SlashingDatabase, slashing_protection: SlashingDatabase,
@ -54,8 +68,9 @@ pub struct ValidatorStore<T, E: EthSpec> {
genesis_validators_root: Hash256, genesis_validators_root: Hash256,
spec: Arc<ChainSpec>, spec: Arc<ChainSpec>,
log: Logger, log: Logger,
temp_dir: Option<Arc<TempDir>>, doppelganger_service: Option<Arc<DoppelgangerService>>,
fork_service: ForkService<T, E>, fork_service: ForkService<T, E>,
slot_clock: T,
} }
impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> { impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
@ -65,6 +80,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
genesis_validators_root: Hash256, genesis_validators_root: Hash256,
spec: ChainSpec, spec: ChainSpec,
fork_service: ForkService<T, E>, fork_service: ForkService<T, E>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
log: Logger, log: Logger,
) -> Self { ) -> Self {
Self { Self {
@ -73,12 +89,32 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))), slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))),
genesis_validators_root, genesis_validators_root,
spec: Arc::new(spec), spec: Arc::new(spec),
log, log: log.clone(),
temp_dir: None, doppelganger_service,
slot_clock: fork_service.slot_clock(),
fork_service, fork_service,
} }
} }
/// Register all local validators in doppelganger protection to try and prevent instances of
/// duplicate validators operating on the network at the same time.
///
/// This function has no effect if doppelganger protection is disabled.
pub fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> {
if let Some(doppelganger_service) = &self.doppelganger_service {
for pubkey in self.validators.read().iter_voting_pubkeys() {
doppelganger_service.register_new_validator::<E, _>(*pubkey, &self.slot_clock)?
}
}
Ok(())
}
/// Returns `true` if doppelganger protection is enabled, or else `false`.
pub fn doppelganger_protection_enabled(&self) -> bool {
self.doppelganger_service.is_some()
}
pub fn initialized_validators(&self) -> Arc<RwLock<InitializedValidators>> { pub fn initialized_validators(&self) -> Arc<RwLock<InitializedValidators>> {
self.validators.clone() self.validators.clone()
} }
@ -105,12 +141,19 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
) )
.map_err(|e| format!("failed to create validator definitions: {:?}", e))?; .map_err(|e| format!("failed to create validator definitions: {:?}", e))?;
let validator_pubkey = validator_def.voting_public_key.compress();
self.slashing_protection self.slashing_protection
.register_validator(validator_def.voting_public_key.compress()) .register_validator(validator_pubkey)
.map_err(|e| format!("failed to register validator: {:?}", e))?; .map_err(|e| format!("failed to register validator: {:?}", e))?;
validator_def.enabled = enable; validator_def.enabled = enable;
if let Some(doppelganger_service) = &self.doppelganger_service {
doppelganger_service
.register_new_validator::<E, _>(validator_pubkey, &self.slot_clock)?;
}
self.validators self.validators
.write() .write()
.add_definition(validator_def.clone()) .add_definition(validator_def.clone())
@ -120,14 +163,92 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Ok(validator_def) Ok(validator_def)
} }
pub fn voting_pubkeys(&self) -> Vec<PublicKeyBytes> { /// Attempts to resolve the pubkey to a validator index.
self.validators ///
/// It may return `None` if the `pubkey` is:
///
/// - Unknown.
/// - Known, but with an unknown index.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option<u64> {
self.validators.read().get_index(pubkey)
}
/// Returns all voting pubkeys for all enabled validators.
///
/// The `filter_func` allows for filtering pubkeys based upon their `DoppelgangerStatus`. There
/// are two primary functions used here:
///
/// - `DoppelgangerStatus::only_safe`: only returns pubkeys which have passed doppelganger
/// protection and are safe-enough to sign messages.
/// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still
/// undergoing protection. This is useful for collecting duties or other non-signing tasks.
#[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock.
pub fn voting_pubkeys<I, F>(&self, filter_func: F) -> I
where
I: FromIterator<PublicKeyBytes>,
F: Fn(DoppelgangerStatus) -> Option<PublicKeyBytes>,
{
// Collect all the pubkeys first to avoid interleaving locks on `self.validators` and
// `self.doppelganger_service()`.
let pubkeys = self
.validators
.read() .read()
.iter_voting_pubkeys() .iter_voting_pubkeys()
.cloned() .cloned()
.collect::<Vec<_>>();
pubkeys
.into_iter()
.map(|pubkey| {
self.doppelganger_service
.as_ref()
.map(|doppelganger_service| doppelganger_service.validator_status(pubkey))
// Allow signing on all pubkeys if doppelganger protection is disabled.
.unwrap_or_else(|| DoppelgangerStatus::SigningEnabled(pubkey))
})
.filter_map(filter_func)
.collect() .collect()
} }
/// Returns doppelganger statuses for all enabled validators.
#[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock.
pub fn doppelganger_statuses(&self) -> Vec<DoppelgangerStatus> {
// Collect all the pubkeys first to avoid interleaving locks on `self.validators` and
// `self.doppelganger_service`.
let pubkeys = self
.validators
.read()
.iter_voting_pubkeys()
.cloned()
.collect::<Vec<_>>();
pubkeys
.into_iter()
.map(|pubkey| {
self.doppelganger_service
.as_ref()
.map(|doppelganger_service| doppelganger_service.validator_status(pubkey))
// Allow signing on all pubkeys if doppelganger protection is disabled.
.unwrap_or_else(|| DoppelgangerStatus::SigningEnabled(pubkey))
})
.collect()
}
/// Check if the `validator_pubkey` is permitted by the doppleganger protection to sign
/// messages.
pub fn doppelganger_protection_allows_signing(&self, validator_pubkey: PublicKeyBytes) -> bool {
self.doppelganger_service
.as_ref()
// If there's no doppelganger service then we assume it is purposefully disabled and
// declare that all keys are safe with regard to it.
.map_or(true, |doppelganger_service| {
doppelganger_service
.validator_status(validator_pubkey)
.only_safe()
.is_some()
})
}
pub fn num_voting_validators(&self) -> usize { pub fn num_voting_validators(&self) -> usize {
self.validators.read().num_enabled() self.validators.read().num_enabled()
} }
@ -136,25 +257,56 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.fork_service.fork() self.fork_service.fork()
} }
/// Runs `func`, providing it access to the `Keypair` corresponding to `validator_pubkey`.
///
/// This forms the canonical point for accessing the secret key of some validator. It is
/// structured as a `with_...` function since we need to pass-through a read-lock in order to
/// access the keypair.
///
/// Access to keypairs might be restricted by other internal mechanisms (e.g., doppleganger
/// protection).
///
/// ## Warning
///
/// This function takes a read-lock on `self.validators`. To prevent deadlocks, it is advised to
/// never take any sort of concurrency lock inside this function.
fn with_validator_keypair<F, R>(
&self,
validator_pubkey: PublicKeyBytes,
func: F,
) -> Result<R, Error>
where
F: FnOnce(&Keypair) -> R,
{
// If the doppelganger service is active, check to ensure it explicitly permits signing by
// this validator.
if !self.doppelganger_protection_allows_signing(validator_pubkey) {
return Err(Error::DoppelgangerProtected(validator_pubkey));
}
let validators_lock = self.validators.read();
Ok(func(
validators_lock
.voting_keypair(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))?,
))
}
pub fn randao_reveal( pub fn randao_reveal(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: PublicKeyBytes,
epoch: Epoch, epoch: Epoch,
) -> Option<Signature> { ) -> Result<Signature, Error> {
self.validators let domain = self.spec.get_domain(
.read() epoch,
.voting_keypair(validator_pubkey) Domain::Randao,
.map(|voting_keypair| { &self.fork(),
let domain = self.spec.get_domain( self.genesis_validators_root,
epoch, );
Domain::Randao, let message = epoch.signing_root(domain);
&self.fork(),
self.genesis_validators_root,
);
let message = epoch.signing_root(domain);
voting_keypair.sk.sign(message) self.with_validator_keypair(validator_pubkey, |keypair| keypair.sk.sign(message))
})
} }
pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> { pub fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option<Graffiti> {
@ -163,10 +315,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
pub fn sign_block( pub fn sign_block(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: PublicKeyBytes,
block: BeaconBlock<E>, block: BeaconBlock<E>,
current_slot: Slot, current_slot: Slot,
) -> Option<SignedBeaconBlock<E>> { ) -> Result<SignedBeaconBlock<E>, Error> {
// Make sure the block slot is not higher than the current slot to avoid potential attacks. // Make sure the block slot is not higher than the current slot to avoid potential attacks.
if block.slot() > current_slot { if block.slot() > current_slot {
warn!( warn!(
@ -175,7 +327,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
"block_slot" => block.slot().as_u64(), "block_slot" => block.slot().as_u64(),
"current_slot" => current_slot.as_u64() "current_slot" => current_slot.as_u64()
); );
return None; return Err(Error::GreaterThanCurrentSlot {
slot: block.slot(),
current_slot,
});
} }
// Check for slashing conditions. // Check for slashing conditions.
@ -188,25 +343,19 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
); );
let slashing_status = self.slashing_protection.check_and_insert_block_proposal( let slashing_status = self.slashing_protection.check_and_insert_block_proposal(
validator_pubkey, &validator_pubkey,
&block.block_header(), &block.block_header(),
domain, domain,
); );
match slashing_status { match slashing_status {
// We can safely sign this block. // We can safely sign this block without slashing.
Ok(Safe::Valid) => { Ok(Safe::Valid) => {
let validators = self.validators.read();
let voting_keypair = validators.voting_keypair(validator_pubkey)?;
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]);
Some(block.sign( self.with_validator_keypair(validator_pubkey, move |keypair| {
&voting_keypair.sk, block.sign(&keypair.sk, &fork, self.genesis_validators_root, &self.spec)
&fork, })
self.genesis_validators_root,
&self.spec,
))
} }
Ok(Safe::SameData) => { Ok(Safe::SameData) => {
warn!( warn!(
@ -214,7 +363,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
"Skipping signing of previously signed block"; "Skipping signing of previously signed block";
); );
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SAME_DATA]); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SAME_DATA]);
None Err(Error::SameData)
} }
Err(NotSafe::UnregisteredValidator(pk)) => { Err(NotSafe::UnregisteredValidator(pk)) => {
warn!( warn!(
@ -224,7 +373,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
"public_key" => format!("{:?}", pk) "public_key" => format!("{:?}", pk)
); );
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::UNREGISTERED]); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::UNREGISTERED]);
None Err(Error::Slashable(NotSafe::UnregisteredValidator(pk)))
} }
Err(e) => { Err(e) => {
crit!( crit!(
@ -233,21 +382,24 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
"error" => format!("{:?}", e) "error" => format!("{:?}", e)
); );
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SLASHABLE]); metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SLASHABLE]);
None Err(Error::Slashable(e))
} }
} }
} }
pub fn sign_attestation( pub fn sign_attestation(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: PublicKeyBytes,
validator_committee_position: usize, validator_committee_position: usize,
attestation: &mut Attestation<E>, attestation: &mut Attestation<E>,
current_epoch: Epoch, current_epoch: Epoch,
) -> Option<()> { ) -> Result<(), Error> {
// Make sure the target epoch is not higher than the current epoch to avoid potential attacks. // Make sure the target epoch is not higher than the current epoch to avoid potential attacks.
if attestation.data.target.epoch > current_epoch { if attestation.data.target.epoch > current_epoch {
return None; return Err(Error::GreaterThanCurrentEpoch {
epoch: attestation.data.target.epoch,
current_epoch,
});
} }
// Checking for slashing conditions. // Checking for slashing conditions.
@ -260,7 +412,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.genesis_validators_root, self.genesis_validators_root,
); );
let slashing_status = self.slashing_protection.check_and_insert_attestation( let slashing_status = self.slashing_protection.check_and_insert_attestation(
validator_pubkey, &validator_pubkey,
&attestation.data, &attestation.data,
domain, domain,
); );
@ -268,29 +420,20 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
match slashing_status { match slashing_status {
// We can safely sign this attestation. // We can safely sign this attestation.
Ok(Safe::Valid) => { Ok(Safe::Valid) => {
let validators = self.validators.read(); self.with_validator_keypair(validator_pubkey, |keypair| {
let voting_keypair = validators.voting_keypair(validator_pubkey)?; attestation.sign(
&keypair.sk,
attestation
.sign(
&voting_keypair.sk,
validator_committee_position, validator_committee_position,
&fork, &fork,
self.genesis_validators_root, self.genesis_validators_root,
&self.spec, &self.spec,
) )
.map_err(|e| { })?
error!( .map_err(Error::UnableToSignAttestation)?;
self.log,
"Error whilst signing attestation";
"error" => format!("{:?}", e)
)
})
.ok()?;
metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]);
Some(()) Ok(())
} }
Ok(Safe::SameData) => { Ok(Safe::SameData) => {
warn!( warn!(
@ -301,7 +444,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
&metrics::SIGNED_ATTESTATIONS_TOTAL, &metrics::SIGNED_ATTESTATIONS_TOTAL,
&[metrics::SAME_DATA], &[metrics::SAME_DATA],
); );
None Err(Error::SameData)
} }
Err(NotSafe::UnregisteredValidator(pk)) => { Err(NotSafe::UnregisteredValidator(pk)) => {
warn!( warn!(
@ -314,7 +457,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
&metrics::SIGNED_ATTESTATIONS_TOTAL, &metrics::SIGNED_ATTESTATIONS_TOTAL,
&[metrics::UNREGISTERED], &[metrics::UNREGISTERED],
); );
None Err(Error::Slashable(NotSafe::UnregisteredValidator(pk)))
} }
Err(e) => { Err(e) => {
crit!( crit!(
@ -327,7 +470,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
&metrics::SIGNED_ATTESTATIONS_TOTAL, &metrics::SIGNED_ATTESTATIONS_TOTAL,
&[metrics::SLASHABLE], &[metrics::SLASHABLE],
); );
None Err(Error::Slashable(e))
} }
} }
} }
@ -338,46 +481,64 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
/// modified by actors other than the signing validator. /// modified by actors other than the signing validator.
pub fn produce_signed_aggregate_and_proof( pub fn produce_signed_aggregate_and_proof(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: PublicKeyBytes,
validator_index: u64, validator_index: u64,
aggregate: Attestation<E>, aggregate: Attestation<E>,
selection_proof: SelectionProof, selection_proof: SelectionProof,
) -> Option<SignedAggregateAndProof<E>> { ) -> Result<SignedAggregateAndProof<E>, Error> {
let validators = self.validators.read(); // Take the fork early to avoid lock interleaving.
let voting_keypair = &validators.voting_keypair(validator_pubkey)?; let fork = self.fork();
let proof = self.with_validator_keypair(validator_pubkey, move |keypair| {
SignedAggregateAndProof::from_aggregate(
validator_index,
aggregate,
Some(selection_proof),
&keypair.sk,
&fork,
self.genesis_validators_root,
&self.spec,
)
})?;
metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]);
Some(SignedAggregateAndProof::from_aggregate( Ok(proof)
validator_index,
aggregate,
Some(selection_proof),
&voting_keypair.sk,
&self.fork(),
self.genesis_validators_root,
&self.spec,
))
} }
/// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to
/// `validator_pubkey`. /// `validator_pubkey`.
pub fn produce_selection_proof( pub fn produce_selection_proof(
&self, &self,
validator_pubkey: &PublicKeyBytes, validator_pubkey: PublicKeyBytes,
slot: Slot, slot: Slot,
) -> Option<SelectionProof> { ) -> Result<SelectionProof, Error> {
let validators = self.validators.read(); // Take the fork early to avoid lock interleaving.
let voting_keypair = &validators.voting_keypair(validator_pubkey)?; let fork = self.fork();
// Bypass the `with_validator_keypair` function.
//
// This is because we don't care about doppelganger protection when it comes to selection
// proofs. They are not slashable and we need them to subscribe to subnets on the BN.
//
// As long as we disallow `SignedAggregateAndProof` then these selection proofs will never
// be published on the network.
let validators_lock = self.validators.read();
let keypair = validators_lock
.voting_keypair(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))?;
let proof = SelectionProof::new::<E>(
slot,
&keypair.sk,
&fork,
self.genesis_validators_root,
&self.spec,
);
metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]); metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]);
Some(SelectionProof::new::<E>( Ok(proof)
slot,
&voting_keypair.sk,
&self.fork(),
self.genesis_validators_root,
&self.spec,
))
} }
/// Prune the slashing protection database so that it remains performant. /// Prune the slashing protection database so that it remains performant.
@ -411,10 +572,11 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS);
let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch());
let validators = self.validators.read(); let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored);
if let Err(e) = self if let Err(e) = self
.slashing_protection .slashing_protection
.prune_all_signed_attestations(validators.iter_voting_pubkeys(), new_min_target_epoch) .prune_all_signed_attestations(all_pubkeys.iter(), new_min_target_epoch)
{ {
error!( error!(
self.log, self.log,
@ -426,7 +588,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
if let Err(e) = self if let Err(e) = self
.slashing_protection .slashing_protection
.prune_all_signed_blocks(validators.iter_voting_pubkeys(), new_min_slot) .prune_all_signed_blocks(all_pubkeys.iter(), new_min_slot)
{ {
error!( error!(
self.log, self.log,