WIP: Made block publishing validator function, which sends to a network channel. Untested.
This commit is contained in:
parent
0136eb33b0
commit
ca9094e79a
@ -25,7 +25,7 @@ types = { path = "../../eth2/types" }
|
|||||||
clap = "2.32.0"
|
clap = "2.32.0"
|
||||||
http = "^0.1.17"
|
http = "^0.1.17"
|
||||||
prometheus = { version = "^0.6", features = ["process"] }
|
prometheus = { version = "^0.6", features = ["process"] }
|
||||||
hyper = "0.12.32"
|
hyper = "0.12.34"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
exit-future = "0.1.3"
|
exit-future = "0.1.3"
|
||||||
tokio = "0.1.17"
|
tokio = "0.1.17"
|
||||||
@ -35,3 +35,5 @@ eth2_config = { path = "../../eth2/utils/eth2_config" }
|
|||||||
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
|
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
|
||||||
slot_clock = { path = "../../eth2/utils/slot_clock" }
|
slot_clock = { path = "../../eth2/utils/slot_clock" }
|
||||||
hex = "0.3.2"
|
hex = "0.3.2"
|
||||||
|
parking_lot = "0.9"
|
||||||
|
|
||||||
|
@ -4,10 +4,11 @@ use bls::PublicKey;
|
|||||||
use eth2_libp2p::{PubsubMessage, Topic};
|
use eth2_libp2p::{PubsubMessage, Topic};
|
||||||
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
||||||
use hex;
|
use hex;
|
||||||
|
use http::header;
|
||||||
use hyper::{Body, Request};
|
use hyper::{Body, Request};
|
||||||
use network::NetworkMessage;
|
use network::NetworkMessage;
|
||||||
|
use parking_lot::RwLock;
|
||||||
use ssz::Encode;
|
use ssz::Encode;
|
||||||
use std::borrow::BorrowMut;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use store::{iter::AncestorIter, Store};
|
use store::{iter::AncestorIter, Store};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@ -41,6 +42,21 @@ pub fn parse_root(string: &str) -> Result<Hash256, ApiError> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks the provided request to ensure that the `content-type` header.
|
||||||
|
///
|
||||||
|
/// The content-type header should either be omitted, in which case JSON is assumed, or it should
|
||||||
|
/// explicity specify `application/json`. If anything else is provided, an error is returned.
|
||||||
|
pub fn check_content_type_for_json(req: &Request<Body>) -> Result<(), ApiError> {
|
||||||
|
match req.headers().get(header::CONTENT_TYPE) {
|
||||||
|
Some(h) if h == "application/json" => Ok(()),
|
||||||
|
Some(h) => Err(ApiError::InvalidQueryParams(format!(
|
||||||
|
"The provided content-type {:?} is not available, it must be JSON.",
|
||||||
|
h
|
||||||
|
))),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse a PublicKey from a `0x` prefixed hex string
|
/// Parse a PublicKey from a `0x` prefixed hex string
|
||||||
pub fn parse_pubkey(string: &str) -> Result<PublicKey, ApiError> {
|
pub fn parse_pubkey(string: &str) -> Result<PublicKey, ApiError> {
|
||||||
const PREFIX: &str = "0x";
|
const PREFIX: &str = "0x";
|
||||||
@ -204,17 +220,9 @@ pub fn get_logger_from_request(req: &Request<Body>) -> slog::Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
||||||
req: &Request<Body>,
|
chan: Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>,
|
||||||
block: BeaconBlock<T::EthSpec>,
|
block: BeaconBlock<T::EthSpec>,
|
||||||
) -> Result<(), ApiError> {
|
) -> Result<(), ApiError> {
|
||||||
// Get the network service from the request
|
|
||||||
let mut network_chan = req
|
|
||||||
.extensions()
|
|
||||||
.get::<mpsc::UnboundedSender<NetworkMessage>>()
|
|
||||||
.expect(
|
|
||||||
"Should always get the network channel from the request, since we put it in there.",
|
|
||||||
);
|
|
||||||
|
|
||||||
// create the network topic to send on
|
// create the network topic to send on
|
||||||
let topic_string = format!(
|
let topic_string = format!(
|
||||||
"/{}/{}/{}",
|
"/{}/{}/{}",
|
||||||
@ -224,7 +232,7 @@ pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
|||||||
let message = PubsubMessage::Block(block.as_ssz_bytes());
|
let message = PubsubMessage::Block(block.as_ssz_bytes());
|
||||||
|
|
||||||
// Publish the block to the p2p network via gossipsub.
|
// Publish the block to the p2p network via gossipsub.
|
||||||
if let Err(e) = &network_chan.try_send(NetworkMessage::Publish {
|
if let Err(e) = chan.write().try_send(NetworkMessage::Publish {
|
||||||
topics: vec![topic],
|
topics: vec![topic],
|
||||||
message: message,
|
message: message,
|
||||||
}) {
|
}) {
|
||||||
|
@ -18,8 +18,9 @@ use client_network::NetworkMessage;
|
|||||||
use client_network::Service as NetworkService;
|
use client_network::Service as NetworkService;
|
||||||
use eth2_config::Eth2Config;
|
use eth2_config::Eth2Config;
|
||||||
use hyper::rt::Future;
|
use hyper::rt::Future;
|
||||||
use hyper::service::service_fn_ok;
|
use hyper::service::Service;
|
||||||
use hyper::{Body, Method, Response, Server, StatusCode};
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
|
use parking_lot::RwLock;
|
||||||
use response_builder::ResponseBuilder;
|
use response_builder::ResponseBuilder;
|
||||||
use slog::{info, o, warn};
|
use slog::{info, o, warn};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
@ -5,8 +5,13 @@ use bls::{AggregateSignature, PublicKey, Signature};
|
|||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use hyper::{Body, Error, Request};
|
use hyper::{Body, Error, Request};
|
||||||
|
use network::NetworkMessage;
|
||||||
|
use parking_lot::RwLock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use slog::info;
|
use slog::{info, trace, warn};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use types::beacon_state::EthSpec;
|
use types::beacon_state::EthSpec;
|
||||||
use types::{Attestation, BeaconBlock, BitList, Epoch, RelativeEpoch, Shard, Slot};
|
use types::{Attestation, BeaconBlock, BitList, Epoch, RelativeEpoch, Shard, Slot};
|
||||||
|
|
||||||
@ -200,17 +205,41 @@ pub fn get_new_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
|
|||||||
|
|
||||||
/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator.
|
/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator.
|
||||||
pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
|
pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
|
||||||
|
let _ = check_content_type_for_json(&req)?;
|
||||||
let log = get_logger_from_request(&req);
|
let log = get_logger_from_request(&req);
|
||||||
let (beacon_chain, _head_state) = get_beacon_chain_from_request::<T>(&req)?;
|
let (beacon_chain, _head_state) = get_beacon_chain_from_request::<T>(&req)?;
|
||||||
|
// Get the network sending channel from the request, for later transmission
|
||||||
|
let network_chan = req
|
||||||
|
.extensions()
|
||||||
|
.get::<Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>>()
|
||||||
|
.expect("Should always get the network channel from the request, since we put it in there.")
|
||||||
|
.clone();
|
||||||
|
|
||||||
let (_head, body) = req.into_parts();
|
let body = req.into_body();
|
||||||
let block_future = body
|
trace!(
|
||||||
.fold(Vec::new(), |mut acc, chunk| {
|
log,
|
||||||
acc.extend_from_slice(&*chunk);
|
"Got the request body, now going to parse it into a block."
|
||||||
futures::future::ok::<_, Error>(acc)
|
);
|
||||||
|
let block = body
|
||||||
|
.concat2()
|
||||||
|
.map(move |chunk| chunk.iter().cloned().collect::<Vec<u8>>())
|
||||||
|
.map(|chunks| {
|
||||||
|
let block_result: Result<BeaconBlock<T::EthSpec>, ApiError> =
|
||||||
|
serde_json::from_slice(&chunks.as_slice()).map_err(|e| {
|
||||||
|
ApiError::InvalidQueryParams(format!(
|
||||||
|
"Unable to deserialize JSON into a BeaconBlock: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
});
|
||||||
|
block_result
|
||||||
})
|
})
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
/*
|
||||||
.map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e)))
|
.map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e)))
|
||||||
.and_then(|body| {
|
.and_then(|body| {
|
||||||
|
trace!(log, "parsing json");
|
||||||
let block_result: Result<BeaconBlock<T::EthSpec>, ApiError> =
|
let block_result: Result<BeaconBlock<T::EthSpec>, ApiError> =
|
||||||
serde_json::from_slice(&body.as_slice()).map_err(|e| {
|
serde_json::from_slice(&body.as_slice()).map_err(|e| {
|
||||||
ApiError::InvalidQueryParams(format!(
|
ApiError::InvalidQueryParams(format!(
|
||||||
@ -220,16 +249,19 @@ pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
|
|||||||
});
|
});
|
||||||
block_result
|
block_result
|
||||||
});
|
});
|
||||||
|
tokio::run(block_future);
|
||||||
let block = block_future.wait()?;
|
let block = block_future.wait()?;
|
||||||
|
*/
|
||||||
|
trace!(log, "BeaconBlock successfully parsed from JSON"; "block" => serde_json::to_string(&block).expect("We should always be able to serialize a block that we just created."));
|
||||||
match beacon_chain.process_block(block.clone()) {
|
match beacon_chain.process_block(block.clone()) {
|
||||||
Ok(BlockProcessingOutcome::Processed {
|
Ok(BlockProcessingOutcome::Processed { block_root }) => {
|
||||||
block_root: block_root,
|
|
||||||
}) => {
|
|
||||||
// Block was processed, publish via gossipsub
|
// Block was processed, publish via gossipsub
|
||||||
info!(log, "Processed valid block from API"; "block_slot" => block.slot, "block_root" => format!("{}", block_root));
|
info!(log, "Processed valid block from API, transmitting to network."; "block_slot" => block.slot, "block_root" => format!("{}", block_root));
|
||||||
publish_beacon_block_to_network::<T>(&req, block)?;
|
publish_beacon_block_to_network::<T>(network_chan, block)?;
|
||||||
}
|
}
|
||||||
Ok(outcome) => {
|
Ok(outcome) => {
|
||||||
|
warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => block.slot, "outcome" => format!("{:?}", outcome));
|
||||||
|
//TODO need to send to network and return http 202
|
||||||
return Err(ApiError::InvalidQueryParams(format!(
|
return Err(ApiError::InvalidQueryParams(format!(
|
||||||
"The BeaconBlock could not be processed: {:?}",
|
"The BeaconBlock could not be processed: {:?}",
|
||||||
outcome
|
outcome
|
||||||
|
Loading…
Reference in New Issue
Block a user