forked from cerc-io/plugeth
56ed6152a1
Shutting down geth prints hundreds of annoying error messages in some cases. The errors appear because the Stop method of eth.ProtocolManager, miner.Miner and core.TxPool is asynchronous. Left over peer sessions generate events which are processed after Stop even though the database has already been closed. The fix is to make Stop synchronous using sync.WaitGroup. For eth.ProtocolManager, in order to make use of WaitGroup safe, we need a way to stop new peer sessions from being added while waiting on the WaitGroup. The eth protocol Run function now selects on a signaling channel and adds to the WaitGroup only if ProtocolManager is not shutting down. For miner.worker and core.TxPool the number of goroutines is static, WaitGroup can be used in the usual way without additional synchronisation.
787 lines
26 KiB
Go
787 lines
26 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package eth
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/eth/downloader"
|
|
"github.com/ethereum/go-ethereum/eth/fetcher"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/logger"
|
|
"github.com/ethereum/go-ethereum/logger/glog"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
"github.com/ethereum/go-ethereum/pow"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
)
|
|
|
|
const (
|
|
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
|
|
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
|
|
)
|
|
|
|
// errIncompatibleConfig is returned if the requested protocols and configs are
|
|
// not compatible (low protocol version restrictions and high requirements).
|
|
var errIncompatibleConfig = errors.New("incompatible configuration")
|
|
|
|
func errResp(code errCode, format string, v ...interface{}) error {
|
|
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
|
|
}
|
|
|
|
type hashFetcherFn func(common.Hash) error
|
|
type blockFetcherFn func([]common.Hash) error
|
|
|
|
type ProtocolManager struct {
|
|
networkId int
|
|
|
|
fastSync bool
|
|
txpool txPool
|
|
blockchain *core.BlockChain
|
|
chaindb ethdb.Database
|
|
|
|
downloader *downloader.Downloader
|
|
fetcher *fetcher.Fetcher
|
|
peers *peerSet
|
|
|
|
SubProtocols []p2p.Protocol
|
|
|
|
eventMux *event.TypeMux
|
|
txSub event.Subscription
|
|
minedBlockSub event.Subscription
|
|
|
|
// channels for fetcher, syncer, txsyncLoop
|
|
newPeerCh chan *peer
|
|
txsyncCh chan *txsync
|
|
quitSync chan struct{}
|
|
noMorePeers chan struct{}
|
|
|
|
// wait group is used for graceful shutdowns during downloading
|
|
// and processing
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
|
// with the ethereum network.
|
|
func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
|
|
// Figure out whether to allow fast sync or not
|
|
if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
|
|
glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled")
|
|
fastSync = false
|
|
}
|
|
// Create the protocol manager with the base fields
|
|
manager := &ProtocolManager{
|
|
networkId: networkId,
|
|
fastSync: fastSync,
|
|
eventMux: mux,
|
|
txpool: txpool,
|
|
blockchain: blockchain,
|
|
chaindb: chaindb,
|
|
peers: newPeerSet(),
|
|
newPeerCh: make(chan *peer),
|
|
noMorePeers: make(chan struct{}),
|
|
txsyncCh: make(chan *txsync),
|
|
quitSync: make(chan struct{}),
|
|
}
|
|
// Initiate a sub-protocol for every implemented version we can handle
|
|
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
|
|
for i, version := range ProtocolVersions {
|
|
// Skip protocol version if incompatible with the mode of operation
|
|
if fastSync && version < eth63 {
|
|
continue
|
|
}
|
|
// Compatible; initialise the sub-protocol
|
|
version := version // Closure for the run
|
|
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
|
|
Name: ProtocolName,
|
|
Version: version,
|
|
Length: ProtocolLengths[i],
|
|
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
|
peer := manager.newPeer(int(version), p, rw)
|
|
select {
|
|
case manager.newPeerCh <- peer:
|
|
manager.wg.Add(1)
|
|
defer manager.wg.Done()
|
|
return manager.handle(peer)
|
|
case <-manager.quitSync:
|
|
return p2p.DiscQuitting
|
|
}
|
|
},
|
|
NodeInfo: func() interface{} {
|
|
return manager.NodeInfo()
|
|
},
|
|
PeerInfo: func(id discover.NodeID) interface{} {
|
|
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
|
|
return p.Info()
|
|
}
|
|
return nil
|
|
},
|
|
})
|
|
}
|
|
if len(manager.SubProtocols) == 0 {
|
|
return nil, errIncompatibleConfig
|
|
}
|
|
// Construct the different synchronisation mechanisms
|
|
manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeader,
|
|
blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
|
|
blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, blockchain.Rollback,
|
|
manager.removePeer)
|
|
|
|
validator := func(block *types.Block, parent *types.Block) error {
|
|
return core.ValidateHeader(config, pow, block.Header(), parent.Header(), true, false)
|
|
}
|
|
heighter := func() uint64 {
|
|
return blockchain.CurrentBlock().NumberU64()
|
|
}
|
|
manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, blockchain.InsertChain, manager.removePeer)
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
func (pm *ProtocolManager) removePeer(id string) {
|
|
// Short circuit if the peer was already removed
|
|
peer := pm.peers.Peer(id)
|
|
if peer == nil {
|
|
return
|
|
}
|
|
glog.V(logger.Debug).Infoln("Removing peer", id)
|
|
|
|
// Unregister the peer from the downloader and Ethereum peer set
|
|
pm.downloader.UnregisterPeer(id)
|
|
if err := pm.peers.Unregister(id); err != nil {
|
|
glog.V(logger.Error).Infoln("Removal failed:", err)
|
|
}
|
|
// Hard disconnect at the networking layer
|
|
if peer != nil {
|
|
peer.Peer.Disconnect(p2p.DiscUselessPeer)
|
|
}
|
|
}
|
|
|
|
func (pm *ProtocolManager) Start() {
|
|
// broadcast transactions
|
|
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
|
|
go pm.txBroadcastLoop()
|
|
// broadcast mined blocks
|
|
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
|
go pm.minedBroadcastLoop()
|
|
|
|
// start sync handlers
|
|
go pm.syncer()
|
|
go pm.txsyncLoop()
|
|
}
|
|
|
|
func (pm *ProtocolManager) Stop() {
|
|
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
|
|
|
|
pm.txSub.Unsubscribe() // quits txBroadcastLoop
|
|
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
|
|
|
// Quit the sync loop.
|
|
// After this send has completed, no new peers will be accepted.
|
|
pm.noMorePeers <- struct{}{}
|
|
|
|
// Quit fetcher, txsyncLoop.
|
|
close(pm.quitSync)
|
|
|
|
// Disconnect existing sessions.
|
|
// This also closes the gate for any new registrations on the peer set.
|
|
// sessions which are already established but not added to pm.peers yet
|
|
// will exit when they try to register.
|
|
pm.peers.Close()
|
|
|
|
// Wait for all peer handler goroutines and the loops to come down.
|
|
pm.wg.Wait()
|
|
|
|
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
|
|
}
|
|
|
|
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
|
|
return newPeer(pv, p, newMeteredMsgWriter(rw))
|
|
}
|
|
|
|
// handle is the callback invoked to manage the life cycle of an eth peer. When
|
|
// this function terminates, the peer is disconnected.
|
|
func (pm *ProtocolManager) handle(p *peer) error {
|
|
glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name())
|
|
|
|
// Execute the Ethereum handshake
|
|
td, head, genesis := pm.blockchain.Status()
|
|
if err := p.Handshake(pm.networkId, td, head, genesis); err != nil {
|
|
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
|
|
return err
|
|
}
|
|
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
|
|
rw.Init(p.version)
|
|
}
|
|
// Register the peer locally
|
|
glog.V(logger.Detail).Infof("%v: adding peer", p)
|
|
if err := pm.peers.Register(p); err != nil {
|
|
glog.V(logger.Error).Infof("%v: addition failed: %v", p, err)
|
|
return err
|
|
}
|
|
defer pm.removePeer(p.id)
|
|
|
|
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
|
|
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
|
|
p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
|
|
p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
|
|
return err
|
|
}
|
|
// Propagate existing transactions. new transactions appearing
|
|
// after this will be sent via broadcasts.
|
|
pm.syncTransactions(p)
|
|
|
|
// main loop. handle incoming messages.
|
|
for {
|
|
if err := pm.handleMsg(p); err != nil {
|
|
glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err)
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleMsg is invoked whenever an inbound message is received from a remote
|
|
// peer. The remote connection is torn down upon returning any error.
|
|
func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|
// Read the next message from the remote peer, and ensure it's fully consumed
|
|
msg, err := p.rw.ReadMsg()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if msg.Size > ProtocolMaxMsgSize {
|
|
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
|
|
}
|
|
defer msg.Discard()
|
|
|
|
// Handle the message depending on its contents
|
|
switch {
|
|
case msg.Code == StatusMsg:
|
|
// Status messages should never arrive after the handshake
|
|
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
|
|
|
|
case p.version < eth62 && msg.Code == GetBlockHashesMsg:
|
|
// Retrieve the number of hashes to return and from which origin hash
|
|
var request getBlockHashesData
|
|
if err := msg.Decode(&request); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
if request.Amount > uint64(downloader.MaxHashFetch) {
|
|
request.Amount = uint64(downloader.MaxHashFetch)
|
|
}
|
|
// Retrieve the hashes from the block chain and return them
|
|
hashes := pm.blockchain.GetBlockHashesFromHash(request.Hash, request.Amount)
|
|
if len(hashes) == 0 {
|
|
glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
|
|
}
|
|
return p.SendBlockHashes(hashes)
|
|
|
|
case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
|
|
// Retrieve and decode the number of hashes to return and from which origin number
|
|
var request getBlockHashesFromNumberData
|
|
if err := msg.Decode(&request); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
if request.Amount > uint64(downloader.MaxHashFetch) {
|
|
request.Amount = uint64(downloader.MaxHashFetch)
|
|
}
|
|
// Calculate the last block that should be retrieved, and short circuit if unavailable
|
|
last := pm.blockchain.GetBlockByNumber(request.Number + request.Amount - 1)
|
|
if last == nil {
|
|
last = pm.blockchain.CurrentBlock()
|
|
request.Amount = last.NumberU64() - request.Number + 1
|
|
}
|
|
if last.NumberU64() < request.Number {
|
|
return p.SendBlockHashes(nil)
|
|
}
|
|
// Retrieve the hashes from the last block backwards, reverse and return
|
|
hashes := []common.Hash{last.Hash()}
|
|
hashes = append(hashes, pm.blockchain.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
|
|
|
|
for i := 0; i < len(hashes)/2; i++ {
|
|
hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
|
|
}
|
|
return p.SendBlockHashes(hashes)
|
|
|
|
case p.version < eth62 && msg.Code == BlockHashesMsg:
|
|
// A batch of hashes arrived to one of our previous requests
|
|
var hashes []common.Hash
|
|
if err := msg.Decode(&hashes); err != nil {
|
|
break
|
|
}
|
|
// Deliver them all to the downloader for queuing
|
|
err := pm.downloader.DeliverHashes(p.id, hashes)
|
|
if err != nil {
|
|
glog.V(logger.Debug).Infoln(err)
|
|
}
|
|
|
|
case p.version < eth62 && msg.Code == GetBlocksMsg:
|
|
// Decode the retrieval message
|
|
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
|
if _, err := msgStream.List(); err != nil {
|
|
return err
|
|
}
|
|
// Gather blocks until the fetch or network limits is reached
|
|
var (
|
|
hash common.Hash
|
|
bytes common.StorageSize
|
|
blocks []*types.Block
|
|
)
|
|
for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
|
|
//Retrieve the hash of the next block
|
|
err := msgStream.Decode(&hash)
|
|
if err == rlp.EOL {
|
|
break
|
|
} else if err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Retrieve the requested block, stopping if enough was found
|
|
if block := pm.blockchain.GetBlock(hash); block != nil {
|
|
blocks = append(blocks, block)
|
|
bytes += block.Size()
|
|
}
|
|
}
|
|
return p.SendBlocks(blocks)
|
|
|
|
case p.version < eth62 && msg.Code == BlocksMsg:
|
|
// Decode the arrived block message
|
|
var blocks []*types.Block
|
|
if err := msg.Decode(&blocks); err != nil {
|
|
glog.V(logger.Detail).Infoln("Decode error", err)
|
|
blocks = nil
|
|
}
|
|
// Update the receive timestamp of each block
|
|
for _, block := range blocks {
|
|
block.ReceivedAt = msg.ReceivedAt
|
|
}
|
|
// Filter out any explicitly requested blocks, deliver the rest to the downloader
|
|
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
|
|
pm.downloader.DeliverBlocks(p.id, blocks)
|
|
}
|
|
|
|
// Block header query, collect the requested headers and reply
|
|
case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
|
|
// Decode the complex header query
|
|
var query getBlockHeadersData
|
|
if err := msg.Decode(&query); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
hashMode := query.Origin.Hash != (common.Hash{})
|
|
|
|
// Gather headers until the fetch or network limits is reached
|
|
var (
|
|
bytes common.StorageSize
|
|
headers []*types.Header
|
|
unknown bool
|
|
)
|
|
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
|
|
// Retrieve the next header satisfying the query
|
|
var origin *types.Header
|
|
if hashMode {
|
|
origin = pm.blockchain.GetHeader(query.Origin.Hash)
|
|
} else {
|
|
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
|
|
}
|
|
if origin == nil {
|
|
break
|
|
}
|
|
headers = append(headers, origin)
|
|
bytes += estHeaderRlpSize
|
|
|
|
// Advance to the next header of the query
|
|
switch {
|
|
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
|
|
// Hash based traversal towards the genesis block
|
|
for i := 0; i < int(query.Skip)+1; i++ {
|
|
if header := pm.blockchain.GetHeader(query.Origin.Hash); header != nil {
|
|
query.Origin.Hash = header.ParentHash
|
|
} else {
|
|
unknown = true
|
|
break
|
|
}
|
|
}
|
|
case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
|
|
// Hash based traversal towards the leaf block
|
|
if header := pm.blockchain.GetHeaderByNumber(origin.Number.Uint64() + query.Skip + 1); header != nil {
|
|
if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
|
|
query.Origin.Hash = header.Hash()
|
|
} else {
|
|
unknown = true
|
|
}
|
|
} else {
|
|
unknown = true
|
|
}
|
|
case query.Reverse:
|
|
// Number based traversal towards the genesis block
|
|
if query.Origin.Number >= query.Skip+1 {
|
|
query.Origin.Number -= (query.Skip + 1)
|
|
} else {
|
|
unknown = true
|
|
}
|
|
|
|
case !query.Reverse:
|
|
// Number based traversal towards the leaf block
|
|
query.Origin.Number += (query.Skip + 1)
|
|
}
|
|
}
|
|
return p.SendBlockHeaders(headers)
|
|
|
|
case p.version >= eth62 && msg.Code == BlockHeadersMsg:
|
|
// A batch of headers arrived to one of our previous requests
|
|
var headers []*types.Header
|
|
if err := msg.Decode(&headers); err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Filter out any explicitly requested headers, deliver the rest to the downloader
|
|
filter := len(headers) == 1
|
|
if filter {
|
|
headers = pm.fetcher.FilterHeaders(headers, time.Now())
|
|
}
|
|
if len(headers) > 0 || !filter {
|
|
err := pm.downloader.DeliverHeaders(p.id, headers)
|
|
if err != nil {
|
|
glog.V(logger.Debug).Infoln(err)
|
|
}
|
|
}
|
|
|
|
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
|
|
// Decode the retrieval message
|
|
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
|
if _, err := msgStream.List(); err != nil {
|
|
return err
|
|
}
|
|
// Gather blocks until the fetch or network limits is reached
|
|
var (
|
|
hash common.Hash
|
|
bytes int
|
|
bodies []rlp.RawValue
|
|
)
|
|
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
|
|
// Retrieve the hash of the next block
|
|
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
|
break
|
|
} else if err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Retrieve the requested block body, stopping if enough was found
|
|
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
|
|
bodies = append(bodies, data)
|
|
bytes += len(data)
|
|
}
|
|
}
|
|
return p.SendBlockBodiesRLP(bodies)
|
|
|
|
case p.version >= eth62 && msg.Code == BlockBodiesMsg:
|
|
// A batch of block bodies arrived to one of our previous requests
|
|
var request blockBodiesData
|
|
if err := msg.Decode(&request); err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Deliver them all to the downloader for queuing
|
|
trasactions := make([][]*types.Transaction, len(request))
|
|
uncles := make([][]*types.Header, len(request))
|
|
|
|
for i, body := range request {
|
|
trasactions[i] = body.Transactions
|
|
uncles[i] = body.Uncles
|
|
}
|
|
// Filter out any explicitly requested bodies, deliver the rest to the downloader
|
|
filter := len(trasactions) > 0 || len(uncles) > 0
|
|
if filter {
|
|
trasactions, uncles = pm.fetcher.FilterBodies(trasactions, uncles, time.Now())
|
|
}
|
|
if len(trasactions) > 0 || len(uncles) > 0 || !filter {
|
|
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
|
|
if err != nil {
|
|
glog.V(logger.Debug).Infoln(err)
|
|
}
|
|
}
|
|
|
|
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
|
|
// Decode the retrieval message
|
|
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
|
if _, err := msgStream.List(); err != nil {
|
|
return err
|
|
}
|
|
// Gather state data until the fetch or network limits is reached
|
|
var (
|
|
hash common.Hash
|
|
bytes int
|
|
data [][]byte
|
|
)
|
|
for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
|
|
// Retrieve the hash of the next state entry
|
|
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
|
break
|
|
} else if err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Retrieve the requested state entry, stopping if enough was found
|
|
if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
|
|
data = append(data, entry)
|
|
bytes += len(entry)
|
|
}
|
|
}
|
|
return p.SendNodeData(data)
|
|
|
|
case p.version >= eth63 && msg.Code == NodeDataMsg:
|
|
// A batch of node state data arrived to one of our previous requests
|
|
var data [][]byte
|
|
if err := msg.Decode(&data); err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Deliver all to the downloader
|
|
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
|
|
glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
|
|
}
|
|
|
|
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
|
|
// Decode the retrieval message
|
|
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
|
if _, err := msgStream.List(); err != nil {
|
|
return err
|
|
}
|
|
// Gather state data until the fetch or network limits is reached
|
|
var (
|
|
hash common.Hash
|
|
bytes int
|
|
receipts []rlp.RawValue
|
|
)
|
|
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
|
|
// Retrieve the hash of the next block
|
|
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
|
break
|
|
} else if err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Retrieve the requested block's receipts, skipping if unknown to us
|
|
results := core.GetBlockReceipts(pm.chaindb, hash)
|
|
if results == nil {
|
|
if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
|
|
continue
|
|
}
|
|
}
|
|
// If known, encode and queue for response packet
|
|
if encoded, err := rlp.EncodeToBytes(results); err != nil {
|
|
glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
|
|
} else {
|
|
receipts = append(receipts, encoded)
|
|
bytes += len(encoded)
|
|
}
|
|
}
|
|
return p.SendReceiptsRLP(receipts)
|
|
|
|
case p.version >= eth63 && msg.Code == ReceiptsMsg:
|
|
// A batch of receipts arrived to one of our previous requests
|
|
var receipts [][]*types.Receipt
|
|
if err := msg.Decode(&receipts); err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
// Deliver all to the downloader
|
|
if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
|
|
glog.V(logger.Debug).Infof("failed to deliver receipts: %v", err)
|
|
}
|
|
|
|
case msg.Code == NewBlockHashesMsg:
|
|
// Retrieve and deserialize the remote new block hashes notification
|
|
type announce struct {
|
|
Hash common.Hash
|
|
Number uint64
|
|
}
|
|
var announces = []announce{}
|
|
|
|
if p.version < eth62 {
|
|
// We're running the old protocol, make block number unknown (0)
|
|
var hashes []common.Hash
|
|
if err := msg.Decode(&hashes); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
for _, hash := range hashes {
|
|
announces = append(announces, announce{hash, 0})
|
|
}
|
|
} else {
|
|
// Otherwise extract both block hash and number
|
|
var request newBlockHashesData
|
|
if err := msg.Decode(&request); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
for _, block := range request {
|
|
announces = append(announces, announce{block.Hash, block.Number})
|
|
}
|
|
}
|
|
// Mark the hashes as present at the remote node
|
|
for _, block := range announces {
|
|
p.MarkBlock(block.Hash)
|
|
p.SetHead(block.Hash)
|
|
}
|
|
// Schedule all the unknown hashes for retrieval
|
|
unknown := make([]announce, 0, len(announces))
|
|
for _, block := range announces {
|
|
if !pm.blockchain.HasBlock(block.Hash) {
|
|
unknown = append(unknown, block)
|
|
}
|
|
}
|
|
for _, block := range unknown {
|
|
if p.version < eth62 {
|
|
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
|
|
} else {
|
|
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
|
|
}
|
|
}
|
|
|
|
case msg.Code == NewBlockMsg:
|
|
// Retrieve and decode the propagated block
|
|
var request newBlockData
|
|
if err := msg.Decode(&request); err != nil {
|
|
return errResp(ErrDecode, "%v: %v", msg, err)
|
|
}
|
|
if err := request.Block.ValidateFields(); err != nil {
|
|
return errResp(ErrDecode, "block validation %v: %v", msg, err)
|
|
}
|
|
request.Block.ReceivedAt = msg.ReceivedAt
|
|
|
|
// Mark the peer as owning the block and schedule it for import
|
|
p.MarkBlock(request.Block.Hash())
|
|
p.SetHead(request.Block.Hash())
|
|
|
|
pm.fetcher.Enqueue(p.id, request.Block)
|
|
|
|
// Update the peers total difficulty if needed, schedule a download if gapped
|
|
if request.TD.Cmp(p.Td()) > 0 {
|
|
p.SetTd(request.TD)
|
|
td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash())
|
|
if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
|
|
go pm.synchronise(p)
|
|
}
|
|
}
|
|
|
|
case msg.Code == TxMsg:
|
|
// Transactions arrived, parse all of them and deliver to the pool
|
|
var txs []*types.Transaction
|
|
if err := msg.Decode(&txs); err != nil {
|
|
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
|
}
|
|
for i, tx := range txs {
|
|
// Validate and mark the remote transaction
|
|
if tx == nil {
|
|
return errResp(ErrDecode, "transaction %d is nil", i)
|
|
}
|
|
p.MarkTransaction(tx.Hash())
|
|
}
|
|
pm.txpool.AddTransactions(txs)
|
|
|
|
default:
|
|
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BroadcastBlock will either propagate a block to a subset of it's peers, or
|
|
// will only announce it's availability (depending what's requested).
|
|
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
|
|
hash := block.Hash()
|
|
peers := pm.peers.PeersWithoutBlock(hash)
|
|
|
|
// If propagation is requested, send to a subset of the peer
|
|
if propagate {
|
|
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
|
|
var td *big.Int
|
|
if parent := pm.blockchain.GetBlock(block.ParentHash()); parent != nil {
|
|
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash()))
|
|
} else {
|
|
glog.V(logger.Error).Infof("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4])
|
|
return
|
|
}
|
|
// Send the block to a subset of our peers
|
|
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
|
|
for _, peer := range transfer {
|
|
peer.SendNewBlock(block, td)
|
|
}
|
|
glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
|
|
}
|
|
// Otherwise if the block is indeed in out own chain, announce it
|
|
if pm.blockchain.HasBlock(hash) {
|
|
for _, peer := range peers {
|
|
if peer.version < eth62 {
|
|
peer.SendNewBlockHashes61([]common.Hash{hash})
|
|
} else {
|
|
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
|
|
}
|
|
}
|
|
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
|
|
}
|
|
}
|
|
|
|
// BroadcastTx will propagate a transaction to all peers which are not known to
|
|
// already have the given transaction.
|
|
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
|
|
// Broadcast transaction to a batch of peers not knowing about it
|
|
peers := pm.peers.PeersWithoutTx(hash)
|
|
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
|
|
for _, peer := range peers {
|
|
peer.SendTransactions(types.Transactions{tx})
|
|
}
|
|
glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
|
|
}
|
|
|
|
// Mined broadcast loop
|
|
func (self *ProtocolManager) minedBroadcastLoop() {
|
|
// automatically stops if unsubscribe
|
|
for obj := range self.minedBlockSub.Chan() {
|
|
switch ev := obj.Data.(type) {
|
|
case core.NewMinedBlockEvent:
|
|
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
|
|
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
|
|
}
|
|
}
|
|
}
|
|
|
|
func (self *ProtocolManager) txBroadcastLoop() {
|
|
// automatically stops if unsubscribe
|
|
for obj := range self.txSub.Chan() {
|
|
event := obj.Data.(core.TxPreEvent)
|
|
self.BroadcastTx(event.Tx.Hash(), event.Tx)
|
|
}
|
|
}
|
|
|
|
// EthNodeInfo represents a short summary of the Ethereum sub-protocol metadata known
|
|
// about the host peer.
|
|
type EthNodeInfo struct {
|
|
Network int `json:"network"` // Ethereum network ID (0=Olympic, 1=Frontier, 2=Morden)
|
|
Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
|
|
Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
|
|
Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
|
|
}
|
|
|
|
// NodeInfo retrieves some protocol metadata about the running host node.
|
|
func (self *ProtocolManager) NodeInfo() *EthNodeInfo {
|
|
return &EthNodeInfo{
|
|
Network: self.networkId,
|
|
Difficulty: self.blockchain.GetTd(self.blockchain.CurrentBlock().Hash()),
|
|
Genesis: self.blockchain.Genesis().Hash(),
|
|
Head: self.blockchain.CurrentBlock().Hash(),
|
|
}
|
|
}
|