Add untested block processing from GRPC

This commit is contained in:
Paul Hauner 2019-03-26 15:26:05 +11:00
parent 3756d8d681
commit 0768d24ffc
No known key found for this signature in database
GPG Key ID: D362883A9218FCC6
3 changed files with 94 additions and 27 deletions

View File

@ -1,3 +1,4 @@
use crate::beacon_chain::BeaconChain;
use crossbeam_channel; use crossbeam_channel;
use eth2_libp2p::rpc::methods::BlockRootSlot; use eth2_libp2p::rpc::methods::BlockRootSlot;
use eth2_libp2p::PubsubMessage; use eth2_libp2p::PubsubMessage;
@ -9,13 +10,15 @@ use protos::services::{
PublishBeaconBlockRequest, PublishBeaconBlockResponse, PublishBeaconBlockRequest, PublishBeaconBlockResponse,
}; };
use protos::services_grpc::BeaconBlockService; use protos::services_grpc::BeaconBlockService;
use slog::debug;
use slog::Logger; use slog::Logger;
use slog::{debug, error, info, warn};
use ssz::{Decodable, TreeHash}; use ssz::{Decodable, TreeHash};
use std::sync::Arc;
use types::{BeaconBlock, Hash256, Slot}; use types::{BeaconBlock, Hash256, Slot};
#[derive(Clone)] #[derive(Clone)]
pub struct BeaconBlockServiceInstance { pub struct BeaconBlockServiceInstance {
pub chain: Arc<BeaconChain>,
pub network_chan: crossbeam_channel::Sender<NetworkMessage>, pub network_chan: crossbeam_channel::Sender<NetworkMessage>,
pub log: Logger, pub log: Logger,
} }
@ -50,39 +53,91 @@ impl BeaconBlockService for BeaconBlockServiceInstance {
req: PublishBeaconBlockRequest, req: PublishBeaconBlockRequest,
sink: UnarySink<PublishBeaconBlockResponse>, sink: UnarySink<PublishBeaconBlockResponse>,
) { ) {
debug!(self.log, "PublishBeaconBlock"); let mut resp = PublishBeaconBlockResponse::new();
let block = req.get_block(); let ssz_serialized_block = req.get_block().get_ssz();
match BeaconBlock::ssz_decode(block.get_ssz(), 0) { match BeaconBlock::ssz_decode(ssz_serialized_block, 0) {
Ok((block, _i)) => { Ok((block, _i)) => {
let block_root = Hash256::from_slice(&block.hash_tree_root()[..]); let block_root = Hash256::from_slice(&block.hash_tree_root()[..]);
// TODO: Obtain topics from the network service properly. match self.chain.process_block(block.clone()) {
let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); Ok(outcome) => {
let message = PubsubMessage::Block(BlockRootSlot { if outcome.sucessfully_processed() {
block_root, // Block was successfully processed.
slot: block.slot, info!(
}); self.log,
"PublishBeaconBlock";
"type" => "invalid_block",
"outcome" => format!("{:?}", outcome)
);
println!("Sending beacon block to gossipsub"); // TODO: Obtain topics from the network service properly.
self.network_chan.send(NetworkMessage::Publish { let topic =
topics: vec![topic], types::TopicBuilder::new("beacon_chain".to_string()).build();
message, let message = PubsubMessage::Block(BlockRootSlot {
}); block_root,
slot: block.slot,
});
println!("Sending beacon block to gossipsub");
self.network_chan.send(NetworkMessage::Publish {
topics: vec![topic],
message,
});
resp.set_success(true);
} else if outcome.is_invalid() {
// Block was invalid.
warn!(
self.log,
"PublishBeaconBlock";
"type" => "invalid_block",
"outcome" => format!("{:?}", outcome)
);
resp.set_success(false);
resp.set_msg(
format!("InvalidBlock: {:?}", outcome).as_bytes().to_vec(),
);
} else {
// Some failure during processing.
error!(
self.log,
"PublishBeaconBlock";
"type" => "other",
"outcome" => format!("{:?}", outcome)
);
resp.set_success(false);
resp.set_msg(format!("other: {:?}", outcome).as_bytes().to_vec());
}
}
Err(e) => {
// Some failure during processing.
error!(
self.log,
"PublishBeaconBlock";
"type" => "failed_to_process",
"error" => format!("{:?}", e)
);
resp.set_success(false);
resp.set_msg(format!("failed_to_process: {:?}", e).as_bytes().to_vec());
}
}
// TODO: actually process the block.
let mut resp = PublishBeaconBlockResponse::new();
resp.set_success(true); resp.set_success(true);
}
Err(_) => {
resp.set_success(false);
resp.set_msg(b"Invalid SSZ".to_vec());
}
};
let f = sink let f = sink
.success(resp) .success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e)); .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f) ctx.spawn(f)
}
Err(e) => {
//
}
}
} }
} }

View File

@ -5,14 +5,18 @@ use beacon_chain::{
parking_lot::RwLockReadGuard, parking_lot::RwLockReadGuard,
slot_clock::SlotClock, slot_clock::SlotClock,
types::{BeaconState, ChainSpec}, types::{BeaconState, ChainSpec},
CheckPoint,
}; };
pub use beacon_chain::{BeaconChainError, BlockProcessingOutcome};
use types::BeaconBlock;
/// The RPC's API to the beacon chain. /// The RPC's API to the beacon chain.
pub trait BeaconChain: Send + Sync { pub trait BeaconChain: Send + Sync {
fn get_spec(&self) -> &ChainSpec; fn get_spec(&self) -> &ChainSpec;
fn get_state(&self) -> RwLockReadGuard<BeaconState>; fn get_state(&self) -> RwLockReadGuard<BeaconState>;
fn process_block(&self, block: BeaconBlock)
-> Result<BlockProcessingOutcome, BeaconChainError>;
} }
impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F> impl<T, U, F> BeaconChain for RawBeaconChain<T, U, F>
@ -28,4 +32,11 @@ where
fn get_state(&self) -> RwLockReadGuard<BeaconState> { fn get_state(&self) -> RwLockReadGuard<BeaconState> {
self.state.read() self.state.read()
} }
fn process_block(
&self,
block: BeaconBlock,
) -> Result<BlockProcessingOutcome, BeaconChainError> {
self.process_block(block)
}
} }

View File

@ -43,6 +43,7 @@ pub fn start_server(
let beacon_block_service = { let beacon_block_service = {
let instance = BeaconBlockServiceInstance { let instance = BeaconBlockServiceInstance {
chain: beacon_chain.clone(),
network_chan, network_chan,
log: log.clone(), log: log.clone(),
}; };