review client comments
This commit is contained in:
parent
97b37474f9
commit
755772e12d
@ -23,7 +23,7 @@ import (
|
|||||||
|
|
||||||
// Protocol client.
|
// Protocol client.
|
||||||
// FIXME: Rename to just `Client`. Not done at the moment to avoid
|
// FIXME: Rename to just `Client`. Not done at the moment to avoid
|
||||||
// disrupt too much of the consumer code, should be done along
|
// disrupting too much of the consumer code, should be done along
|
||||||
// https://github.com/filecoin-project/lotus/issues/2612.
|
// https://github.com/filecoin-project/lotus/issues/2612.
|
||||||
type BlockSync struct {
|
type BlockSync struct {
|
||||||
// Connection manager used to contact the server.
|
// Connection manager used to contact the server.
|
||||||
@ -53,7 +53,7 @@ func NewClient(
|
|||||||
// either a response error status, a failed validation or an internal
|
// either a response error status, a failed validation or an internal
|
||||||
// error.
|
// error.
|
||||||
//
|
//
|
||||||
// This is the internal single-point-of-entry for all external-facing
|
// This is the internal single point of entry for all external-facing
|
||||||
// APIs, currently we have 3 very heterogeneous services exposed:
|
// APIs, currently we have 3 very heterogeneous services exposed:
|
||||||
// * GetBlocks: Headers
|
// * GetBlocks: Headers
|
||||||
// * GetFullTipSet: Headers | Messages
|
// * GetFullTipSet: Headers | Messages
|
||||||
@ -67,21 +67,21 @@ func (client *BlockSync) doRequest(
|
|||||||
req *Request,
|
req *Request,
|
||||||
singlePeer *peer.ID,
|
singlePeer *peer.ID,
|
||||||
) (*validatedResponse, error) {
|
) (*validatedResponse, error) {
|
||||||
// Validate request.
|
// Validate request.
|
||||||
if req.Length == 0 {
|
if req.Length == 0 {
|
||||||
return nil, xerrors.Errorf("invalid request of length 0")
|
return nil, xerrors.Errorf("invalid request of length 0")
|
||||||
}
|
}
|
||||||
if req.Length > MaxRequestLength {
|
if req.Length > MaxRequestLength {
|
||||||
return nil, xerrors.Errorf("request length (%d) above maximum (%d)",
|
return nil, xerrors.Errorf("request length (%d) above maximum (%d)",
|
||||||
req.Length, MaxRequestLength)
|
req.Length, MaxRequestLength)
|
||||||
}
|
}
|
||||||
if req.Options == 0 {
|
if req.Options == 0 {
|
||||||
return nil, xerrors.Errorf("request with no options set")
|
return nil, xerrors.Errorf("request with no options set")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the list of peers to be queried, either the
|
// Generate the list of peers to be queried, either the
|
||||||
// `singlePeer` indicated or all peers available (sorted
|
// `singlePeer` indicated or all peers available (sorted
|
||||||
// by an internal peer tracker with some randomness injected).
|
// by an internal peer tracker with some randomness injected).
|
||||||
var peers []peer.ID
|
var peers []peer.ID
|
||||||
if singlePeer != nil {
|
if singlePeer != nil {
|
||||||
peers = []peer.ID{*singlePeer}
|
peers = []peer.ID{*singlePeer}
|
||||||
@ -130,16 +130,15 @@ func (client *BlockSync) doRequest(
|
|||||||
|
|
||||||
errString := "doRequest failed for all peers"
|
errString := "doRequest failed for all peers"
|
||||||
if singlePeer != nil {
|
if singlePeer != nil {
|
||||||
errString = "doRequest failed for single peer"
|
errString = fmt.Sprintf("doRequest failed for single peer %s", *singlePeer)
|
||||||
// (The peer has already been logged before, don't print it again.)
|
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf(errString)
|
return nil, xerrors.Errorf(errString)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process and validate response. Check the status and that the information
|
// Process and validate response. Check the status, the integrity of the
|
||||||
// returned matches the request (and its integrity). Extract the information
|
// information returned, and that it matches the request. Extract the information
|
||||||
// into a `validatedResponse` for the external-facing APIs to select what they
|
// into a `validatedResponse` for the external-facing APIs to select what they
|
||||||
// want.
|
// need.
|
||||||
//
|
//
|
||||||
// We are conflating in the single error returned both status and validation
|
// We are conflating in the single error returned both status and validation
|
||||||
// errors. Peer penalization should happen here then, before returning, so
|
// errors. Peer penalization should happen here then, before returning, so
|
||||||
@ -156,8 +155,8 @@ func (client *BlockSync) processResponse(
|
|||||||
|
|
||||||
options := parseOptions(req.Options)
|
options := parseOptions(req.Options)
|
||||||
if options.noOptionsSet() {
|
if options.noOptionsSet() {
|
||||||
// Safety check, this shouldn't happen, and even if it did
|
// Safety check: this shouldn't have been sent, and even if it did
|
||||||
// it should be caught by the peer in its error status.
|
// it should have been caught by the peer in its error status.
|
||||||
return nil, xerrors.Errorf("nothing was requested")
|
return nil, xerrors.Errorf("nothing was requested")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,7 +190,7 @@ func (client *BlockSync) processResponse(
|
|||||||
return nil, xerrors.Errorf("returned chain head does not match request")
|
return nil, xerrors.Errorf("returned chain head does not match request")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check `TipSet` are connected (valid chain).
|
// Check `TipSet`s are connected (valid chain).
|
||||||
for i := 0; i < len(validRes.tipsets) - 1; i++ {
|
for i := 0; i < len(validRes.tipsets) - 1; i++ {
|
||||||
if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false {
|
if validRes.tipsets[i].IsChildOf(validRes.tipsets[i+1]) == false {
|
||||||
return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)",
|
return nil, fmt.Errorf("tipsets are not connected at height (head - %d)/(head - %d)",
|
||||||
@ -382,6 +381,7 @@ func (client *BlockSync) sendRequestToPeer(
|
|||||||
// FIXME: What's the point of setting a blank deadline that won't time out?
|
// FIXME: What's the point of setting a blank deadline that won't time out?
|
||||||
// Is this our way of clearing the old one?
|
// Is this our way of clearing the old one?
|
||||||
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart))
|
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart))
|
||||||
|
// FIXME: Should we also remove peer here?
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// FIXME: Same, why are we doing this again here?
|
// FIXME: Same, why are we doing this again here?
|
||||||
@ -390,7 +390,6 @@ func (client *BlockSync) sendRequestToPeer(
|
|||||||
// Read response.
|
// Read response.
|
||||||
var res Response
|
var res Response
|
||||||
err = cborutil.ReadCborRPC(
|
err = cborutil.ReadCborRPC(
|
||||||
// FIXME: Extract constants.
|
|
||||||
bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)),
|
bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)),
|
||||||
&res)
|
&res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -423,7 +422,7 @@ func (client *BlockSync) RemovePeer(p peer.ID) {
|
|||||||
// getShuffledPeers returns a preference-sorted set of peers (by latency
|
// getShuffledPeers returns a preference-sorted set of peers (by latency
|
||||||
// and failure counting), shuffling the first few peers so we don't always
|
// and failure counting), shuffling the first few peers so we don't always
|
||||||
// pick the same peer.
|
// pick the same peer.
|
||||||
// FIXME: Merge with the shuffle if we *always* do it.
|
// FIXME: Consider merging with `shufflePrefix()s`.
|
||||||
func (client *BlockSync) getShuffledPeers() []peer.ID {
|
func (client *BlockSync) getShuffledPeers() []peer.ID {
|
||||||
peers := client.peerTracker.prefSortedPeers()
|
peers := client.peerTracker.prefSortedPeers()
|
||||||
shufflePrefix(peers)
|
shufflePrefix(peers)
|
||||||
|
Loading…
Reference in New Issue
Block a user