be3865211c
These accessors were introduced by light client changes, but the only method that is actually used is GetNumberU64. This commit replaces all uses of .GetNumberU64 with .Number.Uint64.
296 lines
7.4 KiB
Go
296 lines
7.4 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package les implements the Light Ethereum Subprotocol.
|
|
package les
|
|
|
|
import (
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
)
|
|
|
|
type lightFetcher struct {
|
|
pm *ProtocolManager
|
|
odr *LesOdr
|
|
chain BlockChain
|
|
|
|
headAnnouncedMu sync.Mutex
|
|
headAnnouncedBy map[common.Hash][]*peer
|
|
currentTd *big.Int
|
|
deliverChn chan fetchResponse
|
|
reqMu sync.RWMutex
|
|
requested map[uint64]fetchRequest
|
|
timeoutChn chan uint64
|
|
notifyChn chan bool // true if initiated from outside
|
|
syncing bool
|
|
syncDone chan struct{}
|
|
}
|
|
|
|
type fetchRequest struct {
|
|
hash common.Hash
|
|
amount uint64
|
|
peer *peer
|
|
}
|
|
|
|
type fetchResponse struct {
|
|
reqID uint64
|
|
headers []*types.Header
|
|
}
|
|
|
|
func newLightFetcher(pm *ProtocolManager) *lightFetcher {
|
|
f := &lightFetcher{
|
|
pm: pm,
|
|
chain: pm.blockchain,
|
|
odr: pm.odr,
|
|
headAnnouncedBy: make(map[common.Hash][]*peer),
|
|
deliverChn: make(chan fetchResponse, 100),
|
|
requested: make(map[uint64]fetchRequest),
|
|
timeoutChn: make(chan uint64),
|
|
notifyChn: make(chan bool, 100),
|
|
syncDone: make(chan struct{}),
|
|
currentTd: big.NewInt(0),
|
|
}
|
|
go f.syncLoop()
|
|
return f
|
|
}
|
|
|
|
func (f *lightFetcher) notify(p *peer, head *announceData) {
|
|
var headHash common.Hash
|
|
if head == nil {
|
|
// initial notify
|
|
headHash = p.Head()
|
|
} else {
|
|
if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil {
|
|
head.haveHeaders = head.Number
|
|
}
|
|
//fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders)
|
|
if !p.addNotify(head) {
|
|
//fmt.Println("addNotify fail")
|
|
f.pm.removePeer(p.id)
|
|
}
|
|
headHash = head.Hash
|
|
}
|
|
f.headAnnouncedMu.Lock()
|
|
f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p)
|
|
f.headAnnouncedMu.Unlock()
|
|
f.notifyChn <- true
|
|
}
|
|
|
|
func (f *lightFetcher) gotHeader(header *types.Header) {
|
|
f.headAnnouncedMu.Lock()
|
|
defer f.headAnnouncedMu.Unlock()
|
|
|
|
hash := header.Hash()
|
|
peerList := f.headAnnouncedBy[hash]
|
|
if peerList == nil {
|
|
return
|
|
}
|
|
number := header.Number.Uint64()
|
|
td := core.GetTd(f.pm.chainDb, hash, number)
|
|
for _, peer := range peerList {
|
|
peer.lock.Lock()
|
|
ok := peer.gotHeader(hash, number, td)
|
|
peer.lock.Unlock()
|
|
if !ok {
|
|
//fmt.Println("gotHeader fail")
|
|
f.pm.removePeer(peer.id)
|
|
}
|
|
}
|
|
delete(f.headAnnouncedBy, hash)
|
|
}
|
|
|
|
func (f *lightFetcher) nextRequest() (*peer, *announceData) {
|
|
var bestPeer *peer
|
|
bestTd := f.currentTd
|
|
for _, peer := range f.pm.peers.AllPeers() {
|
|
peer.lock.RLock()
|
|
if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 ||
|
|
(bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) {
|
|
bestPeer = peer
|
|
bestTd = peer.headInfo.Td
|
|
}
|
|
peer.lock.RUnlock()
|
|
}
|
|
if bestPeer == nil {
|
|
return nil, nil
|
|
}
|
|
bestPeer.lock.Lock()
|
|
res := bestPeer.headInfo
|
|
res.requested = true
|
|
bestPeer.lock.Unlock()
|
|
for _, peer := range f.pm.peers.AllPeers() {
|
|
if peer != bestPeer {
|
|
peer.lock.Lock()
|
|
if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders {
|
|
peer.headInfo.requested = true
|
|
}
|
|
peer.lock.Unlock()
|
|
}
|
|
}
|
|
return bestPeer, res
|
|
}
|
|
|
|
func (f *lightFetcher) deliverHeaders(reqID uint64, headers []*types.Header) {
|
|
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers}
|
|
}
|
|
|
|
func (f *lightFetcher) requestedID(reqID uint64) bool {
|
|
f.reqMu.RLock()
|
|
_, ok := f.requested[reqID]
|
|
f.reqMu.RUnlock()
|
|
return ok
|
|
}
|
|
|
|
func (f *lightFetcher) request(p *peer, block *announceData) {
|
|
//fmt.Println("request", p.id, block.Number, block.haveHeaders)
|
|
amount := block.Number - block.haveHeaders
|
|
if amount == 0 {
|
|
return
|
|
}
|
|
if amount > 100 {
|
|
f.syncing = true
|
|
go func() {
|
|
//fmt.Println("f.pm.synchronise(p)")
|
|
f.pm.synchronise(p)
|
|
//fmt.Println("sync done")
|
|
f.syncDone <- struct{}{}
|
|
}()
|
|
return
|
|
}
|
|
|
|
reqID := f.odr.getNextReqID()
|
|
f.reqMu.Lock()
|
|
f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p}
|
|
f.reqMu.Unlock()
|
|
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
|
|
p.fcServer.SendRequest(reqID, cost)
|
|
go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true)
|
|
go func() {
|
|
time.Sleep(hardRequestTimeout)
|
|
f.timeoutChn <- reqID
|
|
}()
|
|
}
|
|
|
|
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
|
|
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
|
|
return false
|
|
}
|
|
headers := make([]*types.Header, req.amount)
|
|
for i, header := range resp.headers {
|
|
headers[int(req.amount)-1-i] = header
|
|
}
|
|
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
|
|
return false
|
|
}
|
|
for _, header := range headers {
|
|
td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64())
|
|
if td == nil {
|
|
return false
|
|
}
|
|
if td.Cmp(f.currentTd) > 0 {
|
|
f.currentTd = td
|
|
}
|
|
f.gotHeader(header)
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (f *lightFetcher) checkSyncedHeaders() {
|
|
//fmt.Println("checkSyncedHeaders()")
|
|
for _, peer := range f.pm.peers.AllPeers() {
|
|
peer.lock.Lock()
|
|
h := peer.firstHeadInfo
|
|
remove := false
|
|
loop:
|
|
for h != nil {
|
|
if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil {
|
|
//fmt.Println(" found", h.Number)
|
|
ok := peer.gotHeader(h.Hash, h.Number, td)
|
|
if !ok {
|
|
remove = true
|
|
break loop
|
|
}
|
|
if td.Cmp(f.currentTd) > 0 {
|
|
f.currentTd = td
|
|
}
|
|
}
|
|
h = h.next
|
|
}
|
|
peer.lock.Unlock()
|
|
if remove {
|
|
//fmt.Println("checkSync fail")
|
|
f.pm.removePeer(peer.id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *lightFetcher) syncLoop() {
|
|
f.pm.wg.Add(1)
|
|
defer f.pm.wg.Done()
|
|
|
|
srtoNotify := false
|
|
for {
|
|
select {
|
|
case <-f.pm.quitSync:
|
|
return
|
|
case ext := <-f.notifyChn:
|
|
//fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify)
|
|
s := srtoNotify
|
|
srtoNotify = false
|
|
if !f.syncing && !(ext && s) {
|
|
if p, r := f.nextRequest(); r != nil {
|
|
srtoNotify = true
|
|
go func() {
|
|
time.Sleep(softRequestTimeout)
|
|
f.notifyChn <- false
|
|
}()
|
|
f.request(p, r)
|
|
}
|
|
}
|
|
case reqID := <-f.timeoutChn:
|
|
f.reqMu.Lock()
|
|
req, ok := f.requested[reqID]
|
|
if ok {
|
|
delete(f.requested, reqID)
|
|
}
|
|
f.reqMu.Unlock()
|
|
if ok {
|
|
//fmt.Println("hard timeout")
|
|
f.pm.removePeer(req.peer.id)
|
|
}
|
|
case resp := <-f.deliverChn:
|
|
//fmt.Println("<-f.deliverChn", f.syncing)
|
|
f.reqMu.Lock()
|
|
req, ok := f.requested[resp.reqID]
|
|
delete(f.requested, resp.reqID)
|
|
f.reqMu.Unlock()
|
|
if !ok || !(f.syncing || f.processResponse(req, resp)) {
|
|
//fmt.Println("processResponse fail")
|
|
f.pm.removePeer(req.peer.id)
|
|
}
|
|
case <-f.syncDone:
|
|
//fmt.Println("<-f.syncDone", f.syncing)
|
|
f.checkSyncedHeaders()
|
|
f.syncing = false
|
|
}
|
|
}
|
|
}
|