plugeth/les/handler.go

1285 lines
39 KiB
Go
Raw Normal View History

2016-11-09 01:01:56 +00:00
// Copyright 2016 The go-ethereum Authors
2016-10-14 03:51:29 +00:00
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"encoding/binary"
"encoding/json"
"errors"
2016-10-14 03:51:29 +00:00
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
2016-10-14 03:51:29 +00:00
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
2016-10-14 03:51:29 +00:00
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
2016-10-14 03:51:29 +00:00
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/params"
2016-10-14 03:51:29 +00:00
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
var errTooManyInvalidRequest = errors.New("too many invalid requests made")
2016-10-14 03:51:29 +00:00
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
ethVersion = 63 // equivalent eth version for the downloader
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request
MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request
MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request
MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request
MaxTxStatus = 256 // Amount of transactions to queried per request
2016-10-14 03:51:29 +00:00
disableClientRemovePeer = false
2016-10-14 03:51:29 +00:00
)
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
type BlockChain interface {
Config() *params.ChainConfig
HasHeader(hash common.Hash, number uint64) bool
2016-10-14 03:51:29 +00:00
GetHeader(hash common.Hash, number uint64) *types.Header
GetHeaderByHash(hash common.Hash) *types.Header
CurrentHeader() *types.Header
GetTd(hash common.Hash, number uint64) *big.Int
StateCache() state.Database
2016-10-14 03:51:29 +00:00
InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error)
Rollback(chain []common.Hash)
GetHeaderByNumber(number uint64) *types.Header
GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64)
2016-10-14 03:51:29 +00:00
Genesis() *types.Block
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
2016-10-14 03:51:29 +00:00
}
type txPool interface {
AddRemotes(txs []*types.Transaction) []error
2019-07-22 12:45:40 +00:00
AddRemotesSync(txs []*types.Transaction) []error
Status(hashes []common.Hash) []core.TxStatus
2016-10-14 03:51:29 +00:00
}
type ProtocolManager struct {
// Configs
chainConfig *params.ChainConfig
iConfig *light.IndexerConfig
client bool // The indicator whether the node is light client
maxPeers int // The maximum number peers allowed to connect.
networkId uint64 // The identity of network.
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
txpool txPool
txrelay *lesTxRelay
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
blockchain BlockChain
chainDb ethdb.Database
odr *LesOdr
server *LesServer
serverPool *serverPool
lesTopic discv5.Topic
reqDist *requestDistributor
retriever *retrieveManager
servingQueue *servingQueue
downloader *downloader.Downloader
fetcher *lightFetcher
ulc *ulc
peers *peerSet
checkpoint *params.TrustedCheckpoint
reg *checkpointOracle // If reg == nil, it means the checkpoint registrar is not activated
2016-10-14 03:51:29 +00:00
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
quitSync chan struct{}
noMorePeers chan struct{}
wg *sync.WaitGroup
eventMux *event.TypeMux
// Callbacks
synced func() bool
2019-07-22 12:45:40 +00:00
// Testing fields
addTxsSync bool
2016-10-14 03:51:29 +00:00
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(chainConfig *params.ChainConfig, checkpoint *params.TrustedCheckpoint, indexerConfig *light.IndexerConfig, ulcServers []string, ulcFraction int, client bool, networkId uint64, mux *event.TypeMux, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, serverPool *serverPool, registrar *checkpointOracle, quitSync chan struct{}, wg *sync.WaitGroup, synced func() bool) (*ProtocolManager, error) {
2016-10-14 03:51:29 +00:00
// Create the protocol manager with the base fields
manager := &ProtocolManager{
client: client,
2016-10-14 03:51:29 +00:00
eventMux: mux,
blockchain: blockchain,
chainConfig: chainConfig,
iConfig: indexerConfig,
2016-10-14 03:51:29 +00:00
chainDb: chainDb,
odr: odr,
2016-10-14 03:51:29 +00:00
networkId: networkId,
txpool: txpool,
serverPool: serverPool,
reg: registrar,
peers: peers,
2016-10-14 03:51:29 +00:00
newPeerCh: make(chan *peer),
quitSync: quitSync,
wg: wg,
2016-10-14 03:51:29 +00:00
noMorePeers: make(chan struct{}),
checkpoint: checkpoint,
synced: synced,
2016-10-14 03:51:29 +00:00
}
if odr != nil {
manager.retriever = odr.retriever
manager.reqDist = odr.retriever.dist
}
if ulcServers != nil {
ulc, err := newULC(ulcServers, ulcFraction)
if err != nil {
log.Warn("Failed to initialize ultra light client", "err", err)
} else {
manager.ulc = ulc
}
}
2016-10-14 03:51:29 +00:00
removePeer := manager.removePeer
if disableClientRemovePeer {
removePeer = func(id string) {}
}
if client {
var checkpointNumber uint64
if checkpoint != nil {
checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
manager.downloader = downloader.New(checkpointNumber, chainDb, nil, manager.eventMux, nil, blockchain, removePeer)
manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager)
2016-10-14 03:51:29 +00:00
}
return manager, nil
}
// removePeer initiates disconnection from a peer by removing it from the peer set
2016-10-14 03:51:29 +00:00
func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id)
2016-10-14 03:51:29 +00:00
}
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
if pm.client {
2016-10-14 03:51:29 +00:00
go pm.syncer()
} else {
go func() {
for range pm.newPeerCh {
}
}()
}
}
func (pm *ProtocolManager) Stop() {
// Showing a log message. During download / process this could actually
// take between 5 to 10 seconds and therefor feedback is required.
log.Info("Stopping light Ethereum protocol")
2016-10-14 03:51:29 +00:00
// Quit the sync loop.
// After this send has completed, no new peers will be accepted.
pm.noMorePeers <- struct{}{}
close(pm.quitSync) // quits syncer, fetcher
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
if pm.servingQueue != nil {
pm.servingQueue.stop()
}
2016-10-14 03:51:29 +00:00
// Disconnect existing sessions.
// This also closes the gate for any new registrations on the peer set.
// sessions which are already established but not added to pm.peers yet
// will exit when they try to register.
pm.peers.Close()
// Wait for any process action
pm.wg.Wait()
log.Info("Light Ethereum protocol stopped")
2016-10-14 03:51:29 +00:00
}
// runPeer is the p2p protocol run function for the given version.
func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error {
var entry *poolEntry
peer := pm.newPeer(int(version), pm.networkId, p, rw)
if pm.serverPool != nil {
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
2018-09-24 22:59:00 +00:00
entry = pm.serverPool.connect(peer, peer.Node())
}
peer.poolEntry = entry
select {
case pm.newPeerCh <- peer:
pm.wg.Add(1)
defer pm.wg.Done()
err := pm.handle(peer)
if entry != nil {
pm.serverPool.disconnect(entry)
}
return err
case <-pm.quitSync:
if entry != nil {
pm.serverPool.disconnect(entry)
}
return p2p.DiscQuitting
}
}
func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
var trusted bool
if pm.ulc != nil {
trusted = pm.ulc.trusted(p.ID())
}
return newPeer(pv, nv, trusted, p, newMeteredMsgWriter(rw))
2016-10-14 03:51:29 +00:00
}
// handle is the callback invoked to manage the life cycle of a les peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
// In server mode we try to check into the client pool after handshake
if pm.client && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
clientRejectedMeter.Mark(1)
return p2p.DiscTooManyPeers
}
// Reject light clients if server is not synced.
if !pm.client && !pm.synced() {
clientRejectedMeter.Mark(1)
return p2p.DiscRequested
}
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
2016-10-14 03:51:29 +00:00
// Execute the LES handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
clientErrorMeter.Mark(1)
2016-10-14 03:51:29 +00:00
return err
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
if p.fcClient != nil {
defer p.fcClient.Disconnect()
}
2016-10-14 03:51:29 +00:00
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
2016-10-14 03:51:29 +00:00
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
clientErrorMeter.Mark(1)
p.Log().Error("Light Ethereum peer registration failed", "err", err)
2016-10-14 03:51:29 +00:00
return err
}
connectedAt := time.Now()
2016-10-14 03:51:29 +00:00
defer func() {
pm.removePeer(p.id)
connectionTimer.UpdateSince(connectedAt)
2016-10-14 03:51:29 +00:00
}()
2016-10-14 03:51:29 +00:00
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if pm.client {
p.lock.Lock()
head := p.headInfo
p.lock.Unlock()
if pm.fetcher != nil {
pm.fetcher.announce(p, head)
}
2016-11-17 14:54:24 +00:00
if p.poolEntry != nil {
pm.serverPool.registered(p.poolEntry)
}
2016-10-14 03:51:29 +00:00
}
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Light Ethereum message handling failed", "err", err)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
if p.fcServer != nil {
p.fcServer.DumpLogs()
}
2016-10-14 03:51:29 +00:00
return err
}
}
}
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
select {
case err := <-p.errCh:
return err
default:
}
2016-10-14 03:51:29 +00:00
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.responseCount++
responseCount := p.responseCount
var (
maxCost uint64
task *servingTask
)
accept := func(reqID, reqCnt, maxCnt uint64) bool {
inSizeCost := func() uint64 {
if pm.server.costTracker != nil {
return pm.server.costTracker.realCost(0, msg.Size, 0)
}
return 0
2016-10-14 03:51:29 +00:00
}
if p.isFrozen() || reqCnt == 0 || p.fcClient == nil || reqCnt > maxCnt {
p.fcClient.OneTimeCost(inSizeCost())
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
return false
2016-10-14 03:51:29 +00:00
}
maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt)
gf := float64(1)
if pm.server.costTracker != nil {
gf = pm.server.costTracker.globalFactor()
if gf < 0.001 {
p.Log().Error("Invalid global cost factor", "globalFactor", gf)
gf = 1
}
}
maxTime := uint64(float64(maxCost) / gf)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
if accepted, bufShort, servingPriority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost); !accepted {
p.freezeClient()
p.Log().Warn("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge)))
p.fcClient.OneTimeCost(inSizeCost())
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
return false
} else {
task = pm.servingQueue.newTask(p, maxTime, servingPriority)
}
if task.start() {
return true
2016-10-14 03:51:29 +00:00
}
p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost())
return false
2016-10-14 03:51:29 +00:00
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()
var deliverMsg *Msg
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) {
p.responseLock.Lock()
defer p.responseLock.Unlock()
if p.isFrozen() {
amount = 0
reply = nil
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
var replySize uint32
if reply != nil {
replySize = reply.size()
}
var realCost uint64
if pm.server.costTracker != nil {
realCost = pm.server.costTracker.realCost(servingTime, msg.Size, replySize)
if amount != 0 {
pm.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
} else {
realCost = maxCost
}
bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost)
if reply != nil {
p.queueSend(func() {
if err := reply.send(bv); err != nil {
select {
case p.errCh <- err:
default:
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
})
}
}
2016-10-14 03:51:29 +00:00
// Handle the message depending on its contents
switch msg.Code {
case StatusMsg:
p.Log().Trace("Received status message")
2016-10-14 03:51:29 +00:00
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
// Block header query, collect the requested headers and reply
case AnnounceMsg:
p.Log().Trace("Received announce message")
2016-10-14 03:51:29 +00:00
var req announceData
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
if err := req.sanityCheck(); err != nil {
return err
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
update, size := req.Update.decode()
if p.rejectUpdate(size) {
return errResp(ErrRequestRejected, "")
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.updateFlowControl(update)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
if req.Hash != (common.Hash{}) {
if p.announceType == announceTypeNone {
return errResp(ErrUnexpectedResponse, "")
}
if p.announceType == announceTypeSigned {
if err := req.checkSignature(p.ID(), update); err != nil {
p.Log().Trace("Invalid announcement signature", "err", err)
return err
}
p.Log().Trace("Valid announcement signature")
}
p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth)
if pm.fetcher != nil {
pm.fetcher.announce(p, &req)
}
}
2016-10-14 03:51:29 +00:00
case GetBlockHeadersMsg:
p.Log().Trace("Received block header request")
2016-10-14 03:51:29 +00:00
// Decode the complex header query
var req struct {
ReqID uint64
Query getBlockHeadersData
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
query := req.Query
if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
go func() {
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit {
if !first && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
if first {
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
atomic.AddUint32(&p.invalidCount, 1)
break
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
headers = append(headers, origin)
bytes += estHeaderRlpSize
// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
// Hash based traversal towards the genesis block
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
} else {
unknown = true
}
}
case query.Reverse:
// Number based traversal towards the genesis block
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}
case !query.Reverse:
// Number based traversal towards the leaf block
query.Origin.Number += query.Skip + 1
}
first = false
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
sendResponse(req.ReqID, query.Amount, p.ReplyBlockHeaders(req.ReqID, headers), task.done())
}()
}
2016-10-14 03:51:29 +00:00
case BlockHeadersMsg:
if pm.downloader == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received block header response message")
2016-10-14 03:51:29 +00:00
// A batch of headers arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
Headers []*types.Header
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
2016-10-14 03:51:29 +00:00
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
if err != nil {
log.Debug(fmt.Sprint(err))
2016-10-14 03:51:29 +00:00
}
}
case GetBlockBodiesMsg:
p.Log().Trace("Received block bodies request")
2016-10-14 03:51:29 +00:00
// Decode the retrieval message
var req struct {
ReqID uint64
Hashes []common.Hash
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Gather blocks until the fetch or network limits is reached
var (
bytes int
bodies []rlp.RawValue
)
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
go func() {
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
// Retrieve the requested block body, stopping if enough was found
if bytes >= softResponseLimit {
break
}
number := rawdb.ReadHeaderNumber(pm.chainDb, hash)
if number == nil {
atomic.AddUint32(&p.invalidCount, 1)
continue
}
if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyBlockBodiesRLP(req.ReqID, bodies), task.done())
}()
}
2016-10-14 03:51:29 +00:00
case BlockBodiesMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received block bodies response")
2016-10-14 03:51:29 +00:00
// A batch of block bodies arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
Data []*types.Body
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
2016-10-14 03:51:29 +00:00
deliverMsg = &Msg{
MsgType: MsgBlockBodies,
ReqID: resp.ReqID,
Obj: resp.Data,
}
case GetCodeMsg:
p.Log().Trace("Received code request")
2016-10-14 03:51:29 +00:00
// Decode the retrieval message
var req struct {
ReqID uint64
Reqs []CodeReq
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Gather state data until the fetch or network limits is reached
var (
bytes int
data [][]byte
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
go func() {
for i, request := range req.Reqs {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
// Look up the root hash belonging to the request
number := rawdb.ReadHeaderNumber(pm.chainDb, request.BHash)
if number == nil {
p.Log().Warn("Failed to retrieve block num for code", "hash", request.BHash)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
header := rawdb.ReadHeader(pm.chainDb, request.BHash, *number)
if header == nil {
p.Log().Warn("Failed to retrieve header for code", "block", *number, "hash", request.BHash)
continue
}
// Refuse to search stale state data in the database since looking for
// a non-exist key is kind of expensive.
local := pm.blockchain.CurrentHeader().Number.Uint64()
if !pm.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
triedb := pm.blockchain.StateCache().TrieDB()
account, err := pm.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey))
if err != nil {
p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
code, err := triedb.Node(common.BytesToHash(account.CodeHash))
if err != nil {
p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err)
continue
}
// Accumulate the code and abort if enough data was retrieved
data = append(data, code)
if bytes += len(code); bytes >= softResponseLimit {
break
}
2016-10-14 03:51:29 +00:00
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyCode(req.ReqID, data), task.done())
}()
}
2016-10-14 03:51:29 +00:00
case CodeMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received code response")
2016-10-14 03:51:29 +00:00
// A batch of node state data arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
Data [][]byte
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
2016-10-14 03:51:29 +00:00
deliverMsg = &Msg{
MsgType: MsgCode,
ReqID: resp.ReqID,
Obj: resp.Data,
}
case GetReceiptsMsg:
p.Log().Trace("Received receipts request")
2016-10-14 03:51:29 +00:00
// Decode the retrieval message
var req struct {
ReqID uint64
Hashes []common.Hash
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
go func() {
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
if bytes >= softResponseLimit {
break
}
// Retrieve the requested block's receipts, skipping if unknown to us
var results types.Receipts
number := rawdb.ReadHeaderNumber(pm.chainDb, hash)
if number == nil {
atomic.AddUint32(&p.invalidCount, 1)
continue
}
results = rawdb.ReadRawReceipts(pm.chainDb, hash, *number)
if results == nil {
if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error("Failed to encode receipt", "err", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyReceiptsRLP(req.ReqID, receipts), task.done())
}()
}
2016-10-14 03:51:29 +00:00
case ReceiptsMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received receipts response")
2016-10-14 03:51:29 +00:00
// A batch of receipts arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
Receipts []types.Receipts
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
2016-10-14 03:51:29 +00:00
deliverMsg = &Msg{
MsgType: MsgReceipts,
ReqID: resp.ReqID,
Obj: resp.Receipts,
}
case GetProofsV2Msg:
p.Log().Trace("Received les/2 proofs request")
// Decode the retrieval message
var req struct {
ReqID uint64
Reqs []ProofReq
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Gather state data until the fetch or network limits is reached
var (
lastBHash common.Hash
root common.Hash
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
go func() {
nodes := light.NewNodeSet()
for i, request := range req.Reqs {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
// Look up the root hash belonging to the request
var (
number *uint64
header *types.Header
trie state.Trie
)
if request.BHash != lastBHash {
root, lastBHash = common.Hash{}, request.BHash
if number = rawdb.ReadHeaderNumber(pm.chainDb, request.BHash); number == nil {
p.Log().Warn("Failed to retrieve block num for proof", "hash", request.BHash)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
if header = rawdb.ReadHeader(pm.chainDb, request.BHash, *number); header == nil {
p.Log().Warn("Failed to retrieve header for proof", "block", *number, "hash", request.BHash)
continue
}
// Refuse to search stale state data in the database since looking for
// a non-exist key is kind of expensive.
local := pm.blockchain.CurrentHeader().Number.Uint64()
if !pm.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local {
p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
root = header.Root
}
// If a header lookup failed (non existent), ignore subsequent requests for the same header
if root == (common.Hash{}) {
atomic.AddUint32(&p.invalidCount, 1)
continue
}
// Open the account or storage trie for the request
statedb := pm.blockchain.StateCache()
switch len(request.AccKey) {
case 0:
// No account key specified, open an account trie
trie, err = statedb.OpenTrie(root)
if trie == nil || err != nil {
p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err)
continue
}
default:
// Account key specified, open a storage trie
account, err := pm.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey))
if err != nil {
p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err)
atomic.AddUint32(&p.invalidCount, 1)
continue
}
trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root)
if trie == nil || err != nil {
p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err)
continue
}
}
// Prove the user's request from the account or stroage trie
if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil {
p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err)
continue
}
if nodes.DataSize() >= softResponseLimit {
break
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyProofsV2(req.ReqID, nodes.NodeList()), task.done())
}()
}
case ProofsV2Msg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received les/2 proofs response")
// A batch of merkle proofs arrived to one of our previous requests
var resp struct {
ReqID, BV uint64
Data light.NodeList
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgProofsV2,
2016-10-14 03:51:29 +00:00
ReqID: resp.ReqID,
Obj: resp.Data,
}
case GetHelperTrieProofsMsg:
p.Log().Trace("Received helper trie proof request")
// Decode the retrieval message
var req struct {
ReqID uint64
Reqs []HelperTrieReq
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Gather state data until the fetch or network limits is reached
var (
auxBytes int
auxData [][]byte
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
go func() {
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
var (
lastIdx uint64
lastType uint
root common.Hash
auxTrie *trie.Trie
)
nodes := light.NewNodeSet()
for i, request := range req.Reqs {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx {
auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx
var prefix string
if root, prefix = pm.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) {
auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(pm.chainDb, prefix)))
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
if request.AuxReq == auxRoot {
var data []byte
if root != (common.Hash{}) {
data = root[:]
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
auxData = append(auxData, data)
auxBytes += len(data)
} else {
if auxTrie != nil {
auxTrie.Prove(request.Key, request.FromLevel, nodes)
}
if request.AuxReq != 0 {
data := pm.getHelperTrieAuxData(request)
auxData = append(auxData, data)
auxBytes += len(data)
}
}
if nodes.DataSize()+auxBytes >= softResponseLimit {
break
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}), task.done())
}()
}
case HelperTrieProofsMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received helper trie proof response")
var resp struct {
ReqID, BV uint64
Data HelperTrieResps
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
deliverMsg = &Msg{
MsgType: MsgHelperTrieProofs,
ReqID: resp.ReqID,
Obj: resp.Data,
}
case SendTxV2Msg:
if pm.txpool == nil {
return errResp(ErrRequestRejected, "")
}
// Transactions arrived, parse all of them and deliver to the pool
var req struct {
ReqID uint64
Txs []*types.Transaction
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Txs)
if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
go func() {
stats := make([]light.TxStatus, len(req.Txs))
for i, tx := range req.Txs {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
hash := tx.Hash()
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
stats[i] = pm.txStatus(hash)
if stats[i].Status == core.TxStatusUnknown {
2019-07-22 12:45:40 +00:00
addFn := pm.txpool.AddRemotes
// Add txs synchronously for testing purpose
if pm.addTxsSync {
addFn = pm.txpool.AddRemotesSync
}
if errs := addFn([]*types.Transaction{tx}); errs[0] != nil {
stats[i].Error = errs[0].Error()
continue
}
stats[i] = pm.txStatus(hash)
}
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
}()
}
case GetTxStatusMsg:
if pm.txpool == nil {
return errResp(ErrUnexpectedResponse, "")
}
// Transactions arrived, parse all of them and deliver to the pool
var req struct {
ReqID uint64
Hashes []common.Hash
}
if err := msg.Decode(&req); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
go func() {
stats := make([]light.TxStatus, len(req.Hashes))
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
return
}
stats[i] = pm.txStatus(hash)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
}
sendResponse(req.ReqID, uint64(reqCnt), p.ReplyTxStatus(req.ReqID, stats), task.done())
}()
}
case TxStatusMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.Log().Trace("Received tx status response")
var resp struct {
ReqID, BV uint64
Status []light.TxStatus
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.Log().Trace("Received helper trie proof response")
deliverMsg = &Msg{
MsgType: MsgTxStatus,
ReqID: resp.ReqID,
Obj: resp.Status,
}
case StopMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
p.freezeServer(true)
pm.retriever.frozen(p)
p.Log().Warn("Service stopped")
case ResumeMsg:
if pm.odr == nil {
return errResp(ErrUnexpectedResponse, "")
}
var bv uint64
if err := msg.Decode(&bv); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ResumeFreeze(bv)
p.freezeServer(false)
p.Log().Warn("Service resumed")
2016-10-14 03:51:29 +00:00
default:
p.Log().Trace("Received unknown message", "code", msg.Code)
2016-10-14 03:51:29 +00:00
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
if deliverMsg != nil {
err := pm.retriever.deliver(p, deliverMsg)
if err != nil {
p.responseErrors++
if p.responseErrors > maxResponseErrors {
return err
}
}
2016-10-14 03:51:29 +00:00
}
// If the client has made too much invalid request(e.g. request a non-exist data),
// reject them to prevent SPAM attack.
if atomic.LoadUint32(&p.invalidCount) > maxRequestErrors {
return errTooManyInvalidRequest
}
2016-10-14 03:51:29 +00:00
return nil
}
// getAccount retrieves an account from the state based at root.
func (pm *ProtocolManager) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) {
trie, err := trie.New(root, triedb)
if err != nil {
return state.Account{}, err
}
blob, err := trie.TryGet(hash[:])
if err != nil {
return state.Account{}, err
}
var account state.Account
if err = rlp.DecodeBytes(blob, &account); err != nil {
return state.Account{}, err
}
return account, nil
}
// getHelperTrie returns the post-processed trie root for the given trie ID and section index
func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
switch id {
case htCanonical:
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.ChtSize-1)
return light.GetChtRoot(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
case htBloomBits:
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1)
return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
}
return common.Hash{}, ""
}
// getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request
func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 {
blockNum := binary.BigEndian.Uint64(req.Key)
hash := rawdb.ReadCanonicalHash(pm.chainDb, blockNum)
return rawdb.ReadHeaderRLP(pm.chainDb, hash, blockNum)
}
return nil
}
func (pm *ProtocolManager) txStatus(hash common.Hash) light.TxStatus {
var stat light.TxStatus
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
// If the transaction is unknown to the pool, try looking it up locally
if stat.Status == core.TxStatusUnknown {
if tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(pm.chainDb, hash); tx != nil {
stat.Status = core.TxStatusIncluded
stat.Lookup = &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex}
}
}
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
return stat
}
// downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify ProtocolManager
type peerConnection struct {
manager *ProtocolManager
peer *peer
}
func (pc *peerConnection) Head() (common.Hash, *big.Int) {
return pc.peer.HeadAndTd()
}
func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return light.ErrNoPeers
}
return nil
}
func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
les, les/flowcontrol: improved request serving and flow control (#18230) This change - implements concurrent LES request serving even for a single peer. - replaces the request cost estimation method with a cost table based on benchmarks which gives much more consistent results. Until now the allowed number of light peers was just a guess which probably contributed a lot to the fluctuating quality of available service. Everything related to request cost is implemented in a single object, the 'cost tracker'. It uses a fixed cost table with a global 'correction factor'. Benchmark code is included and can be run at any time to adapt costs to low-level implementation changes. - reimplements flowcontrol.ClientManager in a cleaner and more efficient way, with added capabilities: There is now control over bandwidth, which allows using the flow control parameters for client prioritization. Target utilization over 100 percent is now supported to model concurrent request processing. Total serving bandwidth is reduced during block processing to prevent database contention. - implements an RPC API for the LES servers allowing server operators to assign priority bandwidth to certain clients and change prioritized status even while the client is connected. The new API is meant for cases where server operators charge for LES using an off-protocol mechanism. - adds a unit test for the new client manager. - adds an end-to-end test using the network simulator that tests bandwidth control functions through the new API.
2019-02-26 11:32:48 +00:00
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return light.ErrNoPeers
}
return nil
}
func (d *downloaderPeerNotify) registerPeer(p *peer) {
pm := (*ProtocolManager)(d)
pc := &peerConnection{
manager: pm,
peer: p,
}
pm.downloader.RegisterLightPeer(p.id, ethVersion, pc)
}
func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
pm := (*ProtocolManager)(d)
pm.downloader.UnregisterPeer(p.id)
}