254 lines
8.1 KiB
Go
254 lines
8.1 KiB
Go
// Copyright 2021 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package eth
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
)
|
|
|
|
var (
|
|
// errDisconnected is returned if a request is attempted to be made to a peer
|
|
// that was already closed.
|
|
errDisconnected = errors.New("disconnected")
|
|
|
|
// errDanglingResponse is returned if a response arrives with a request id
|
|
// which does not match to any existing pending requests.
|
|
errDanglingResponse = errors.New("response to non-existent request")
|
|
|
|
// errMismatchingResponseType is returned if the remote peer sent a different
|
|
// packet type as a response to a request than what the local node expected.
|
|
errMismatchingResponseType = errors.New("mismatching response type")
|
|
)
|
|
|
|
// Request is a pending request to allow tracking it and delivering a response
|
|
// back to the requester on their chosen channel.
|
|
type Request struct {
|
|
peer *Peer // Peer to which this request belogs for untracking
|
|
id uint64 // Request ID to match up replies to
|
|
|
|
sink chan *Response // Channel to deliver the response on
|
|
cancel chan struct{} // Channel to cancel requests ahead of time
|
|
|
|
code uint64 // Message code of the request packet
|
|
want uint64 // Message code of the response packet
|
|
data interface{} // Data content of the request packet
|
|
|
|
Peer string // Demultiplexer if cross-peer requests are batched together
|
|
Sent time.Time // Timestamp when the request was sent
|
|
}
|
|
|
|
// Close aborts an in-flight request. Although there's no way to notify the
|
|
// remote peer about the cancellation, this method notifies the dispatcher to
|
|
// discard any late responses.
|
|
func (r *Request) Close() error {
|
|
if r.peer == nil { // Tests mock out the dispatcher, skip internal cancellation
|
|
return nil
|
|
}
|
|
cancelOp := &cancel{
|
|
id: r.id,
|
|
fail: make(chan error),
|
|
}
|
|
select {
|
|
case r.peer.reqCancel <- cancelOp:
|
|
if err := <-cancelOp.fail; err != nil {
|
|
return err
|
|
}
|
|
close(r.cancel)
|
|
return nil
|
|
case <-r.peer.term:
|
|
return errDisconnected
|
|
}
|
|
}
|
|
|
|
// request is a wrapper around a client Request that has an error channel to
|
|
// signal on if sending the request already failed on a network level.
|
|
type request struct {
|
|
req *Request
|
|
fail chan error
|
|
}
|
|
|
|
// cancel is a maintenance type on the dispatcher to stop tracking a pending
|
|
// request.
|
|
type cancel struct {
|
|
id uint64 // Request ID to stop tracking
|
|
fail chan error
|
|
}
|
|
|
|
// Response is a reply packet to a previously created request. It is delivered
|
|
// on the channel assigned by the requester subsystem and contains the original
|
|
// request embedded to allow uniquely matching it caller side.
|
|
type Response struct {
|
|
id uint64 // Request ID to match up this reply to
|
|
recv time.Time // Timestamp when the request was received
|
|
code uint64 // Response packet type to cross validate with request
|
|
|
|
Req *Request // Original request to cross-reference with
|
|
Res interface{} // Remote response for the request query
|
|
Meta interface{} // Metadata generated locally on the receiver thread
|
|
Time time.Duration // Time it took for the request to be served
|
|
Done chan error // Channel to signal message handling to the reader
|
|
}
|
|
|
|
// response is a wrapper around a remote Response that has an error channel to
|
|
// signal on if processing the response failed.
|
|
type response struct {
|
|
res *Response
|
|
fail chan error
|
|
}
|
|
|
|
// dispatchRequest schedules the request to the dispatcher for tracking and
|
|
// network serialization, blocking until it's successfully sent.
|
|
//
|
|
// The returned Request must either be closed before discarding it, or the reply
|
|
// must be waited for and the Response's Done channel signalled.
|
|
func (p *Peer) dispatchRequest(req *Request) error {
|
|
reqOp := &request{
|
|
req: req,
|
|
fail: make(chan error),
|
|
}
|
|
req.cancel = make(chan struct{})
|
|
req.peer = p
|
|
req.Peer = p.id
|
|
|
|
select {
|
|
case p.reqDispatch <- reqOp:
|
|
return <-reqOp.fail
|
|
case <-p.term:
|
|
return errDisconnected
|
|
}
|
|
}
|
|
|
|
// dispatchRequest fulfils a pending request and delivers it to the requested
|
|
// sink.
|
|
func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error {
|
|
resOp := &response{
|
|
res: res,
|
|
fail: make(chan error),
|
|
}
|
|
res.recv = time.Now()
|
|
res.Done = make(chan error)
|
|
|
|
select {
|
|
case p.resDispatch <- resOp:
|
|
// Ensure the response is accepted by the dispatcher
|
|
if err := <-resOp.fail; err != nil {
|
|
return nil
|
|
}
|
|
// Request was accepted, run any postprocessing step to generate metadata
|
|
// on the receiver thread, not the sink thread
|
|
if metadata != nil {
|
|
res.Meta = metadata()
|
|
}
|
|
// Deliver the filled out response and wait until it's handled. This
|
|
// path is a bit funky as Go's select has no order, so if a response
|
|
// arrives to an already cancelled request, there's a 50-50% changes
|
|
// of picking on channel or the other. To avoid such cases delivering
|
|
// the packet upstream, check for cancellation first and only after
|
|
// block on delivery.
|
|
select {
|
|
case <-res.Req.cancel:
|
|
return nil // Request cancelled, silently discard response
|
|
default:
|
|
// Request not yet cancelled, attempt to deliver it, but do watch
|
|
// for fresh cancellations too
|
|
select {
|
|
case res.Req.sink <- res:
|
|
return <-res.Done // Response delivered, return any errors
|
|
case <-res.Req.cancel:
|
|
return nil // Request cancelled, silently discard response
|
|
}
|
|
}
|
|
|
|
case <-p.term:
|
|
return errDisconnected
|
|
}
|
|
}
|
|
|
|
// dispatcher is a loop that accepts requests from higher layer packages, pushes
|
|
// it to the network and tracks and dispatches the responses back to the original
|
|
// requester.
|
|
func (p *Peer) dispatcher() {
|
|
pending := make(map[uint64]*Request)
|
|
|
|
for {
|
|
select {
|
|
case reqOp := <-p.reqDispatch:
|
|
req := reqOp.req
|
|
req.Sent = time.Now()
|
|
|
|
requestTracker.Track(p.id, p.version, req.code, req.want, req.id)
|
|
err := p2p.Send(p.rw, req.code, req.data)
|
|
reqOp.fail <- err
|
|
|
|
if err == nil {
|
|
pending[req.id] = req
|
|
}
|
|
|
|
case cancelOp := <-p.reqCancel:
|
|
// Retrieve the pendign request to cancel and short circuit if it
|
|
// has already been serviced and is not available anymore
|
|
req := pending[cancelOp.id]
|
|
if req == nil {
|
|
cancelOp.fail <- nil
|
|
continue
|
|
}
|
|
// Stop tracking the request
|
|
delete(pending, cancelOp.id)
|
|
cancelOp.fail <- nil
|
|
|
|
case resOp := <-p.resDispatch:
|
|
res := resOp.res
|
|
res.Req = pending[res.id]
|
|
|
|
// Independent if the request exists or not, track this packet
|
|
requestTracker.Fulfil(p.id, p.version, res.code, res.id)
|
|
|
|
switch {
|
|
case res.Req == nil:
|
|
// Response arrived with an untracked ID. Since even cancelled
|
|
// requests are tracked until fulfilment, a dangling repsponse
|
|
// means the remote peer implements the protocol badly.
|
|
resOp.fail <- errDanglingResponse
|
|
|
|
case res.Req.want != res.code:
|
|
// Response arrived, but it's a different packet type than the
|
|
// one expected by the requester. Either the local code is bad,
|
|
// or the remote peer send junk. In neither cases can we handle
|
|
// the packet.
|
|
resOp.fail <- fmt.Errorf("%w: have %d, want %d", errMismatchingResponseType, res.code, res.Req.want)
|
|
|
|
default:
|
|
// All dispatcher checks passed and the response was initialized
|
|
// with the matching request. Signal to the delivery routine that
|
|
// it can wait for a handler response and dispatch the data.
|
|
res.Time = res.recv.Sub(res.Req.Sent)
|
|
resOp.fail <- nil
|
|
|
|
// Stop tracking the request, the response dispatcher will deliver
|
|
delete(pending, res.id)
|
|
}
|
|
|
|
case <-p.term:
|
|
return
|
|
}
|
|
}
|
|
}
|