WIP: Added POST functionality for pusblish_beacon_block. Currently doesn't compile, struggling with the borrow checker.
This commit is contained in:
parent
99c673045c
commit
0136eb33b0
@ -215,6 +215,7 @@ where
|
|||||||
executor,
|
executor,
|
||||||
beacon_chain.clone(),
|
beacon_chain.clone(),
|
||||||
network.clone(),
|
network.clone(),
|
||||||
|
network_send.clone(),
|
||||||
client_config.db_path().expect("unable to read datadir"),
|
client_config.db_path().expect("unable to read datadir"),
|
||||||
eth2_config.clone(),
|
eth2_config.clone(),
|
||||||
&log,
|
&log,
|
||||||
|
@ -1,11 +1,17 @@
|
|||||||
use crate::{ApiError, ApiResult};
|
use crate::{ApiError, ApiResult};
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
use bls::PublicKey;
|
use bls::PublicKey;
|
||||||
|
use eth2_libp2p::{PubsubMessage, Topic};
|
||||||
|
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
||||||
use hex;
|
use hex;
|
||||||
use hyper::{Body, Request};
|
use hyper::{Body, Request};
|
||||||
|
use network::NetworkMessage;
|
||||||
|
use ssz::Encode;
|
||||||
|
use std::borrow::BorrowMut;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use store::{iter::AncestorIter, Store};
|
use store::{iter::AncestorIter, Store};
|
||||||
use types::{BeaconState, EthSpec, Hash256, RelativeEpoch, Slot};
|
use tokio::sync::mpsc;
|
||||||
|
use types::{BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot};
|
||||||
|
|
||||||
/// Parse a slot from a `0x` preixed string.
|
/// Parse a slot from a `0x` preixed string.
|
||||||
///
|
///
|
||||||
@ -197,6 +203,40 @@ pub fn get_logger_from_request(req: &Request<Body>) -> slog::Logger {
|
|||||||
log.to_owned()
|
log.to_owned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
||||||
|
req: &Request<Body>,
|
||||||
|
block: BeaconBlock<T::EthSpec>,
|
||||||
|
) -> Result<(), ApiError> {
|
||||||
|
// Get the network service from the request
|
||||||
|
let mut network_chan = req
|
||||||
|
.extensions()
|
||||||
|
.get::<mpsc::UnboundedSender<NetworkMessage>>()
|
||||||
|
.expect(
|
||||||
|
"Should always get the network channel from the request, since we put it in there.",
|
||||||
|
);
|
||||||
|
|
||||||
|
// create the network topic to send on
|
||||||
|
let topic_string = format!(
|
||||||
|
"/{}/{}/{}",
|
||||||
|
TOPIC_PREFIX, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX
|
||||||
|
);
|
||||||
|
let topic = Topic::new(topic_string);
|
||||||
|
let message = PubsubMessage::Block(block.as_ssz_bytes());
|
||||||
|
|
||||||
|
// Publish the block to the p2p network via gossipsub.
|
||||||
|
if let Err(e) = &network_chan.try_send(NetworkMessage::Publish {
|
||||||
|
topics: vec![topic],
|
||||||
|
message: message,
|
||||||
|
}) {
|
||||||
|
return Err(ApiError::ServerError(format!(
|
||||||
|
"Unable to send new block to network: {:?}",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -14,6 +14,7 @@ mod url_query;
|
|||||||
mod validator;
|
mod validator;
|
||||||
|
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
|
use client_network::NetworkMessage;
|
||||||
use client_network::Service as NetworkService;
|
use client_network::Service as NetworkService;
|
||||||
use eth2_config::Eth2Config;
|
use eth2_config::Eth2Config;
|
||||||
use hyper::rt::Future;
|
use hyper::rt::Future;
|
||||||
@ -25,6 +26,7 @@ use std::ops::Deref;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::runtime::TaskExecutor;
|
use tokio::runtime::TaskExecutor;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use url_query::UrlQuery;
|
use url_query::UrlQuery;
|
||||||
|
|
||||||
pub use beacon::{BlockResponse, HeadResponse, StateResponse};
|
pub use beacon::{BlockResponse, HeadResponse, StateResponse};
|
||||||
@ -83,6 +85,7 @@ pub fn start_server<T: BeaconChainTypes>(
|
|||||||
executor: &TaskExecutor,
|
executor: &TaskExecutor,
|
||||||
beacon_chain: Arc<BeaconChain<T>>,
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
network_service: Arc<NetworkService<T>>,
|
network_service: Arc<NetworkService<T>>,
|
||||||
|
network_chan: mpsc::UnboundedSender<NetworkMessage>,
|
||||||
db_path: PathBuf,
|
db_path: PathBuf,
|
||||||
eth2_config: Eth2Config,
|
eth2_config: Eth2Config,
|
||||||
log: &slog::Logger,
|
log: &slog::Logger,
|
||||||
@ -113,6 +116,7 @@ pub fn start_server<T: BeaconChainTypes>(
|
|||||||
let beacon_chain = server_bc.clone();
|
let beacon_chain = server_bc.clone();
|
||||||
let db_path = db_path.clone();
|
let db_path = db_path.clone();
|
||||||
let network_service = network_service.clone();
|
let network_service = network_service.clone();
|
||||||
|
let network_chan = network_chan.clone();
|
||||||
let eth2_config = eth2_config.clone();
|
let eth2_config = eth2_config.clone();
|
||||||
|
|
||||||
// Create a simple handler for the router, inject our stateful objects into the request.
|
// Create a simple handler for the router, inject our stateful objects into the request.
|
||||||
@ -126,6 +130,8 @@ pub fn start_server<T: BeaconChainTypes>(
|
|||||||
req.extensions_mut().insert::<DBPath>(db_path.clone());
|
req.extensions_mut().insert::<DBPath>(db_path.clone());
|
||||||
req.extensions_mut()
|
req.extensions_mut()
|
||||||
.insert::<Arc<NetworkService<T>>>(network_service.clone());
|
.insert::<Arc<NetworkService<T>>>(network_service.clone());
|
||||||
|
req.extensions_mut()
|
||||||
|
.insert::<mpsc::UnboundedSender<NetworkMessage>>(network_chan.clone());
|
||||||
req.extensions_mut()
|
req.extensions_mut()
|
||||||
.insert::<Arc<Eth2Config>>(eth2_config.clone());
|
.insert::<Arc<Eth2Config>>(eth2_config.clone());
|
||||||
|
|
||||||
@ -177,7 +183,7 @@ pub fn start_server<T: BeaconChainTypes>(
|
|||||||
validator::get_new_beacon_block::<T>(req)
|
validator::get_new_beacon_block::<T>(req)
|
||||||
}
|
}
|
||||||
(&Method::POST, "/beacon/validator/block") => {
|
(&Method::POST, "/beacon/validator/block") => {
|
||||||
helpers::implementation_pending_response(req)
|
validator::publish_beacon_block::<T>(req)
|
||||||
}
|
}
|
||||||
(&Method::GET, "/beacon/validator/attestation") => {
|
(&Method::GET, "/beacon/validator/attestation") => {
|
||||||
validator::get_new_attestation::<T>(req)
|
validator::get_new_attestation::<T>(req)
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
use super::{success_response, ApiResult};
|
use super::{success_response, ApiResult};
|
||||||
use crate::{helpers::*, ApiError, UrlQuery};
|
use crate::{helpers::*, ApiError, UrlQuery};
|
||||||
use beacon_chain::BeaconChainTypes;
|
use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome};
|
||||||
use bls::{AggregateSignature, PublicKey, Signature};
|
use bls::{AggregateSignature, PublicKey, Signature};
|
||||||
use hyper::{Body, Request};
|
use futures::future::Future;
|
||||||
|
use futures::stream::Stream;
|
||||||
|
use hyper::{Body, Error, Request};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use slog::info;
|
||||||
use types::beacon_state::EthSpec;
|
use types::beacon_state::EthSpec;
|
||||||
use types::{Attestation, BitList, Epoch, RelativeEpoch, Shard, Slot};
|
use types::{Attestation, BeaconBlock, BitList, Epoch, RelativeEpoch, Shard, Slot};
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct ValidatorDuty {
|
pub struct ValidatorDuty {
|
||||||
@ -195,6 +198,54 @@ pub fn get_new_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
|
|||||||
Ok(success_response(body))
|
Ok(success_response(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator.
|
||||||
|
pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
|
||||||
|
let log = get_logger_from_request(&req);
|
||||||
|
let (beacon_chain, _head_state) = get_beacon_chain_from_request::<T>(&req)?;
|
||||||
|
|
||||||
|
let (_head, body) = req.into_parts();
|
||||||
|
let block_future = body
|
||||||
|
.fold(Vec::new(), |mut acc, chunk| {
|
||||||
|
acc.extend_from_slice(&*chunk);
|
||||||
|
futures::future::ok::<_, Error>(acc)
|
||||||
|
})
|
||||||
|
.map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e)))
|
||||||
|
.and_then(|body| {
|
||||||
|
let block_result: Result<BeaconBlock<T::EthSpec>, ApiError> =
|
||||||
|
serde_json::from_slice(&body.as_slice()).map_err(|e| {
|
||||||
|
ApiError::InvalidQueryParams(format!(
|
||||||
|
"Unable to deserialize JSON into a BeaconBlock: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
});
|
||||||
|
block_result
|
||||||
|
});
|
||||||
|
let block = block_future.wait()?;
|
||||||
|
match beacon_chain.process_block(block.clone()) {
|
||||||
|
Ok(BlockProcessingOutcome::Processed {
|
||||||
|
block_root: block_root,
|
||||||
|
}) => {
|
||||||
|
// Block was processed, publish via gossipsub
|
||||||
|
info!(log, "Processed valid block from API"; "block_slot" => block.slot, "block_root" => format!("{}", block_root));
|
||||||
|
publish_beacon_block_to_network::<T>(&req, block)?;
|
||||||
|
}
|
||||||
|
Ok(outcome) => {
|
||||||
|
return Err(ApiError::InvalidQueryParams(format!(
|
||||||
|
"The BeaconBlock could not be processed: {:?}",
|
||||||
|
outcome
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(ApiError::ServerError(format!(
|
||||||
|
"Unable to process block: {:?}",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(success_response(Body::empty()))
|
||||||
|
}
|
||||||
|
|
||||||
/// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator.
|
/// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator.
|
||||||
pub fn get_new_attestation<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
|
pub fn get_new_attestation<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
|
||||||
let (beacon_chain, head_state) = get_beacon_chain_from_request::<T>(&req)?;
|
let (beacon_chain, head_state) = get_beacon_chain_from_request::<T>(&req)?;
|
||||||
|
Loading…
Reference in New Issue
Block a user