diff --git a/eth/backend.go b/eth/backend.go
index d5b767b12..f98c9b724 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -104,6 +104,7 @@ type Config struct {
type LesServer interface {
Start(srvr *p2p.Server)
+ Synced()
Stop()
Protocols() []p2p.Protocol
}
@@ -145,6 +146,7 @@ type Ethereum struct {
func (s *Ethereum) AddLesServer(ls LesServer) {
s.lesServer = ls
+ s.protocolManager.lesServer = ls
}
// New creates a new Ethereum object (including the
diff --git a/eth/handler.go b/eth/handler.go
index 8f05cc5b1..771e69b8d 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -87,6 +87,8 @@ type ProtocolManager struct {
quitSync chan struct{}
noMorePeers chan struct{}
+ lesServer LesServer
+
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -171,7 +173,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
- atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import
+ manager.setSynced() // Mark initial sync done on any fetcher import
return manager.insertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
diff --git a/eth/sync.go b/eth/sync.go
index 373cc2054..234534b4f 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -181,7 +181,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return
}
- atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
+ pm.setSynced() // Mark initial sync done
// If fast sync was enabled, and we synced up, disable it
if atomic.LoadUint32(&pm.fastSync) == 1 {
@@ -192,3 +192,10 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
}
}
}
+
+// setSynced sets the synced flag and notifies the light server if present
+func (pm *ProtocolManager) setSynced() {
+ if atomic.SwapUint32(&pm.synced, 1) == 0 && pm.lesServer != nil {
+ pm.lesServer.Synced()
+ }
+}
diff --git a/les/fetcher.go b/les/fetcher.go
index ae9bf8474..c23af8da3 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -23,135 +23,374 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/light"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
)
+const (
+ blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
+ maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
+)
+
+// lightFetcher
type lightFetcher struct {
pm *ProtocolManager
odr *LesOdr
- chain BlockChain
+ chain *light.LightChain
- headAnnouncedMu sync.Mutex
- headAnnouncedBy map[common.Hash][]*peer
- currentTd *big.Int
- deliverChn chan fetchResponse
- reqMu sync.RWMutex
- requested map[uint64]fetchRequest
- timeoutChn chan uint64
- notifyChn chan bool // true if initiated from outside
- syncing bool
- syncDone chan struct{}
+ maxConfirmedTd *big.Int
+ peers map[*peer]*fetcherPeerInfo
+ lastUpdateStats *updateStatsEntry
+
+ lock sync.Mutex // qwerqwerqwe
+ deliverChn chan fetchResponse
+ reqMu sync.RWMutex
+ requested map[uint64]fetchRequest
+ timeoutChn chan uint64
+ requestChn chan bool // true if initiated from outside
+ syncing bool
+ syncDone chan *peer
}
+// fetcherPeerInfo holds fetcher-specific information about each active peer
+type fetcherPeerInfo struct {
+ root, lastAnnounced *fetcherTreeNode
+ nodeCnt int
+ confirmedTd *big.Int
+ bestConfirmed *fetcherTreeNode
+ nodeByHash map[common.Hash]*fetcherTreeNode
+ firstUpdateStats *updateStatsEntry
+}
+
+// fetcherTreeNode is a node of a tree that holds information about blocks recently
+// announced and confirmed by a certain peer. Each new announce message from a peer
+// adds nodes to the tree, based on the previous announced head and the reorg depth.
+// There are three possible states for a tree node:
+// - announced: not downloaded (known) yet, but we know its head, number and td
+// - intermediate: not known, hash and td are empty, they are filled out when it becomes known
+// - known: both announced by this peer and downloaded (from any peer).
+// This structure makes it possible to always know which peer has a certain block,
+// which is necessary for selecting a suitable peer for ODR requests and also for
+// canonizing new heads. It also helps to always download the minimum necessary
+// amount of headers with a single request.
+type fetcherTreeNode struct {
+ hash common.Hash
+ number uint64
+ td *big.Int
+ known, requested bool
+ parent *fetcherTreeNode
+ children []*fetcherTreeNode
+}
+
+// fetchRequest represents a header download request
type fetchRequest struct {
- hash common.Hash
- amount uint64
- peer *peer
+ hash common.Hash
+ amount uint64
+ peer *peer
+ sent mclock.AbsTime
+ timeout bool
}
+// fetchResponse represents a header download response
type fetchResponse struct {
reqID uint64
headers []*types.Header
+ peer *peer
}
+// newLightFetcher creates a new light fetcher
func newLightFetcher(pm *ProtocolManager) *lightFetcher {
f := &lightFetcher{
- pm: pm,
- chain: pm.blockchain,
- odr: pm.odr,
- headAnnouncedBy: make(map[common.Hash][]*peer),
- deliverChn: make(chan fetchResponse, 100),
- requested: make(map[uint64]fetchRequest),
- timeoutChn: make(chan uint64),
- notifyChn: make(chan bool, 100),
- syncDone: make(chan struct{}),
- currentTd: big.NewInt(0),
+ pm: pm,
+ chain: pm.blockchain.(*light.LightChain),
+ odr: pm.odr,
+ peers: make(map[*peer]*fetcherPeerInfo),
+ deliverChn: make(chan fetchResponse, 100),
+ requested: make(map[uint64]fetchRequest),
+ timeoutChn: make(chan uint64),
+ requestChn: make(chan bool, 100),
+ syncDone: make(chan *peer),
+ maxConfirmedTd: big.NewInt(0),
}
go f.syncLoop()
return f
}
-func (f *lightFetcher) notify(p *peer, head *announceData) {
- var headHash common.Hash
- if head == nil {
- // initial notify
- headHash = p.Head()
- } else {
- if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil {
- head.haveHeaders = head.Number
+// syncLoop is the main event loop of the light fetcher
+func (f *lightFetcher) syncLoop() {
+ f.pm.wg.Add(1)
+ defer f.pm.wg.Done()
+
+ requestStarted := false
+ for {
+ select {
+ case <-f.pm.quitSync:
+ return
+ // when a new announce is received, request loop keeps running until
+ // no further requests are necessary or possible
+ case newAnnounce := <-f.requestChn:
+ f.lock.Lock()
+ s := requestStarted
+ requestStarted = false
+ if !f.syncing && !(newAnnounce && s) {
+ if peer, node, amount := f.nextRequest(); node != nil {
+ requestStarted = true
+ reqID, started := f.request(peer, node, amount)
+ if started {
+ go func() {
+ time.Sleep(softRequestTimeout)
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ req.timeout = true
+ f.requested[reqID] = req
+ }
+ f.reqMu.Unlock()
+ // keep starting new requests while possible
+ f.requestChn <- false
+ }()
+ }
+ }
+ }
+ f.lock.Unlock()
+ case reqID := <-f.timeoutChn:
+ f.reqMu.Lock()
+ req, ok := f.requested[reqID]
+ if ok {
+ delete(f.requested, reqID)
+ }
+ f.reqMu.Unlock()
+ if ok {
+ f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
+ glog.V(logger.Debug).Infof("hard timeout by peer %v", req.peer.id)
+ go f.pm.removePeer(req.peer.id)
+ }
+ case resp := <-f.deliverChn:
+ f.reqMu.Lock()
+ req, ok := f.requested[resp.reqID]
+ if ok && req.peer != resp.peer {
+ ok = false
+ }
+ if ok {
+ delete(f.requested, resp.reqID)
+ }
+ f.reqMu.Unlock()
+ if ok {
+ f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
+ }
+ f.lock.Lock()
+ if !ok || !(f.syncing || f.processResponse(req, resp)) {
+ glog.V(logger.Debug).Infof("failed processing response by peer %v", resp.peer.id)
+ go f.pm.removePeer(resp.peer.id)
+ }
+ f.lock.Unlock()
+ case p := <-f.syncDone:
+ f.lock.Lock()
+ glog.V(logger.Debug).Infof("done synchronising with peer %v", p.id)
+ f.checkSyncedHeaders(p)
+ f.syncing = false
+ f.lock.Unlock()
}
- //fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders)
- if !p.addNotify(head) {
- //fmt.Println("addNotify fail")
- f.pm.removePeer(p.id)
- }
- headHash = head.Hash
}
- f.headAnnouncedMu.Lock()
- f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p)
- f.headAnnouncedMu.Unlock()
- f.notifyChn <- true
}
-func (f *lightFetcher) gotHeader(header *types.Header) {
- f.headAnnouncedMu.Lock()
- defer f.headAnnouncedMu.Unlock()
+// addPeer adds a new peer to the fetcher's peer set
+func (f *lightFetcher) addPeer(p *peer) {
+ p.lock.Lock()
+ p.hasBlock = func(hash common.Hash, number uint64) bool {
+ return f.peerHasBlock(p, hash, number)
+ }
+ p.lock.Unlock()
- hash := header.Hash()
- peerList := f.headAnnouncedBy[hash]
- if peerList == nil {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
+}
+
+// removePeer removes a new peer from the fetcher's peer set
+func (f *lightFetcher) removePeer(p *peer) {
+ p.lock.Lock()
+ p.hasBlock = nil
+ p.lock.Unlock()
+
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ // check for potential timed out block delay statistics
+ f.checkUpdateStats(p, nil)
+ delete(f.peers, p)
+}
+
+// announce processes a new announcement message received from a peer, adding new
+// nodes to the peer's block tree and removing old nodes if necessary
+func (f *lightFetcher) announce(p *peer, head *announceData) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ glog.V(logger.Debug).Infof("received announce from peer %v #%d %016x reorg: %d", p.id, head.Number, head.Hash[:8], head.ReorgDepth)
+
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("announce: unknown peer")
return
}
- number := header.Number.Uint64()
- td := core.GetTd(f.pm.chainDb, hash, number)
- for _, peer := range peerList {
- peer.lock.Lock()
- ok := peer.gotHeader(hash, number, td)
- peer.lock.Unlock()
- if !ok {
- //fmt.Println("gotHeader fail")
- f.pm.removePeer(peer.id)
- }
- }
- delete(f.headAnnouncedBy, hash)
-}
-func (f *lightFetcher) nextRequest() (*peer, *announceData) {
- var bestPeer *peer
- bestTd := f.currentTd
- for _, peer := range f.pm.peers.AllPeers() {
- peer.lock.RLock()
- if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 ||
- (bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) {
- bestPeer = peer
- bestTd = peer.headInfo.Td
+ if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
+ // announced tds should be strictly monotonic
+ glog.V(logger.Debug).Infof("non-monotonic Td from peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ return
+ }
+
+ n := fp.lastAnnounced
+ for i := uint64(0); i < head.ReorgDepth; i++ {
+ if n == nil {
+ break
}
- peer.lock.RUnlock()
+ n = n.parent
}
- if bestPeer == nil {
- return nil, nil
- }
- bestPeer.lock.Lock()
- res := bestPeer.headInfo
- res.requested = true
- bestPeer.lock.Unlock()
- for _, peer := range f.pm.peers.AllPeers() {
- if peer != bestPeer {
- peer.lock.Lock()
- if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders {
- peer.headInfo.requested = true
+ if n != nil {
+ // n is now the reorg common ancestor, add a new branch of nodes
+ // check if the node count is too high to add new nodes
+ locked := false
+ for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
+ if !locked {
+ f.chain.LockChain()
+ defer f.chain.UnlockChain()
+ locked = true
}
- peer.lock.Unlock()
+ // if one of root's children is canonical, keep it, delete other branches and root itself
+ var newRoot *fetcherTreeNode
+ for i, nn := range fp.root.children {
+ if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
+ fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
+ nn.parent = nil
+ newRoot = nn
+ break
+ }
+ }
+ fp.deleteNode(fp.root)
+ if n == fp.root {
+ n = newRoot
+ }
+ fp.root = newRoot
+ if newRoot == nil || !f.checkKnownNode(p, newRoot) {
+ fp.bestConfirmed = nil
+ fp.confirmedTd = nil
+ }
+
+ if n == nil {
+ break
+ }
+ }
+ if n != nil {
+ for n.number < head.Number {
+ nn := &fetcherTreeNode{number: n.number + 1, parent: n}
+ n.children = append(n.children, nn)
+ n = nn
+ fp.nodeCnt++
+ }
+ n.hash = head.Hash
+ n.td = head.Td
+ fp.nodeByHash[n.hash] = n
}
}
- return bestPeer, res
+ if n == nil {
+ // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
+ if fp.root != nil {
+ fp.deleteNode(fp.root)
+ }
+ n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
+ fp.root = n
+ fp.nodeCnt++
+ fp.nodeByHash[n.hash] = n
+ fp.bestConfirmed = nil
+ fp.confirmedTd = nil
+ }
+
+ f.checkKnownNode(p, n)
+ p.lock.Lock()
+ p.headInfo = head
+ fp.lastAnnounced = n
+ p.lock.Unlock()
+ f.checkUpdateStats(p, nil)
+ f.requestChn <- true
}
-func (f *lightFetcher) deliverHeaders(reqID uint64, headers []*types.Header) {
- f.deliverChn <- fetchResponse{reqID: reqID, headers: headers}
+// peerHasBlock returns true if we can assume the peer knows the given block
+// based on its announcements
+func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ fp := f.peers[p]
+ if fp == nil || fp.root == nil {
+ return false
+ }
+
+ if number >= fp.root.number {
+ // it is recent enough that if it is known, is should be in the peer's block tree
+ return fp.nodeByHash[hash] != nil
+ }
+ f.chain.LockChain()
+ defer f.chain.UnlockChain()
+ // if it's older than the peer's block tree root but it's in the same canonical chain
+ // than the root, we can still be sure the peer knows it
+ return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
}
+// request initiates a header download request from a certain peer
+func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("request: unknown peer")
+ return 0, false
+ }
+ if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
+ f.syncing = true
+ go func() {
+ glog.V(logger.Debug).Infof("synchronising with peer %v", p.id)
+ f.pm.synchronise(p)
+ f.syncDone <- p
+ }()
+ return 0, false
+ }
+
+ reqID := getNextReqID()
+ n.requested = true
+ cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
+ p.fcServer.SendRequest(reqID, cost)
+ f.reqMu.Lock()
+ f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
+ f.reqMu.Unlock()
+ go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
+ go func() {
+ time.Sleep(hardRequestTimeout)
+ f.timeoutChn <- reqID
+ }()
+ return reqID, true
+}
+
+// requestAmount calculates the amount of headers to be downloaded starting
+// from a certain head backwards
+func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
+ amount := uint64(0)
+ nn := n
+ for nn != nil && !f.checkKnownNode(p, nn) {
+ nn = nn.parent
+ amount++
+ }
+ if nn == nil {
+ amount = n.number
+ }
+ return amount
+}
+
+// requestedID tells if a certain reqID has been requested by the fetcher
func (f *lightFetcher) requestedID(reqID uint64) bool {
f.reqMu.RLock()
_, ok := f.requested[reqID]
@@ -159,36 +398,51 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
return ok
}
-func (f *lightFetcher) request(p *peer, block *announceData) {
- //fmt.Println("request", p.id, block.Number, block.haveHeaders)
- amount := block.Number - block.haveHeaders
- if amount == 0 {
- return
+// nextRequest selects the peer and announced head to be requested next, amount
+// to be downloaded starting from the head backwards is also returned
+func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
+ var (
+ bestHash common.Hash
+ bestAmount uint64
+ )
+ bestTd := f.maxConfirmedTd
+
+ for p, fp := range f.peers {
+ for hash, n := range fp.nodeByHash {
+ if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
+ amount := f.requestAmount(p, n)
+ if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
+ bestHash = hash
+ bestAmount = amount
+ bestTd = n.td
+ }
+ }
+ }
}
- if amount > 100 {
- f.syncing = true
- go func() {
- //fmt.Println("f.pm.synchronise(p)")
- f.pm.synchronise(p)
- //fmt.Println("sync done")
- f.syncDone <- struct{}{}
- }()
- return
+ if bestTd == f.maxConfirmedTd {
+ return nil, nil, 0
}
- reqID := f.odr.getNextReqID()
- f.reqMu.Lock()
- f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p}
- f.reqMu.Unlock()
- cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
- p.fcServer.SendRequest(reqID, cost)
- go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true)
- go func() {
- time.Sleep(hardRequestTimeout)
- f.timeoutChn <- reqID
- }()
+ peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ fp := f.peers[p]
+ if fp == nil || fp.nodeByHash[bestHash] == nil {
+ return false, 0
+ }
+ return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
+ })
+ var node *fetcherTreeNode
+ if peer != nil {
+ node = f.peers[peer].nodeByHash[bestHash]
+ }
+ return peer, node, bestAmount
}
+// deliverHeaders delivers header download request responses for processing
+func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
+ f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
+}
+
+// processResponse processes header download request responses
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
return false
@@ -200,96 +454,248 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
return false
}
- for _, header := range headers {
- td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64())
+ tds := make([]*big.Int, len(headers))
+ for i, header := range headers {
+ td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
if td == nil {
return false
}
- if td.Cmp(f.currentTd) > 0 {
- f.currentTd = td
- }
- f.gotHeader(header)
+ tds[i] = td
}
+ f.newHeaders(headers, tds)
return true
}
-func (f *lightFetcher) checkSyncedHeaders() {
- //fmt.Println("checkSyncedHeaders()")
- for _, peer := range f.pm.peers.AllPeers() {
- peer.lock.Lock()
- h := peer.firstHeadInfo
- remove := false
- loop:
- for h != nil {
- if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil {
- //fmt.Println(" found", h.Number)
- ok := peer.gotHeader(h.Hash, h.Number, td)
- if !ok {
- remove = true
- break loop
- }
- if td.Cmp(f.currentTd) > 0 {
- f.currentTd = td
+// newHeaders updates the block trees of all active peers according to a newly
+// downloaded and validated batch or headers
+func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
+ var maxTd *big.Int
+ for p, fp := range f.peers {
+ if !f.checkAnnouncedHeaders(fp, headers, tds) {
+ glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ }
+ if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
+ maxTd = fp.confirmedTd
+ }
+ }
+ if maxTd != nil {
+ f.updateMaxConfirmedTd(maxTd)
+ }
+}
+
+// checkAnnouncedHeaders updates peer's block tree if necessary after validating
+// a batch of headers. It searches for the latest header in the batch that has a
+// matching tree node (if any), and if it has not been marked as known already,
+// sets it and its parents to known (even those which are older than the currently
+// validated ones). Return value shows if all hashes, numbers and Tds matched
+// correctly to the announced values (otherwise the peer should be dropped).
+func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
+ var (
+ n *fetcherTreeNode
+ header *types.Header
+ td *big.Int
+ )
+
+ for i := len(headers) - 1; ; i-- {
+ if i < 0 {
+ if n == nil {
+ // no more headers and nothing to match
+ return true
+ }
+ // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
+ td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1)
+ header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1)
+ } else {
+ header = headers[i]
+ td = tds[i]
+ }
+ hash := header.Hash()
+ number := header.Number.Uint64()
+ if n == nil {
+ n = fp.nodeByHash[hash]
+ }
+ if n != nil {
+ if n.td == nil {
+ // node was unannounced
+ if nn := fp.nodeByHash[hash]; nn != nil {
+ // if there was already a node with the same hash, continue there and drop this one
+ nn.children = append(nn.children, n.children...)
+ n.children = nil
+ fp.deleteNode(n)
+ n = nn
+ } else {
+ n.hash = hash
+ n.td = td
+ fp.nodeByHash[hash] = n
}
}
- h = h.next
- }
- peer.lock.Unlock()
- if remove {
- //fmt.Println("checkSync fail")
- f.pm.removePeer(peer.id)
+ // check if it matches the header
+ if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
+ // peer has previously made an invalid announcement
+ return false
+ }
+ if n.known {
+ // we reached a known node that matched our expectations, return with success
+ return true
+ }
+ n.known = true
+ if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
+ fp.confirmedTd = td
+ fp.bestConfirmed = n
+ }
+ n = n.parent
+ if n == nil {
+ return true
+ }
}
}
}
-func (f *lightFetcher) syncLoop() {
- f.pm.wg.Add(1)
- defer f.pm.wg.Done()
+// checkSyncedHeaders updates peer's block tree after synchronisation by marking
+// downloaded headers as known. If none of the announced headers are found after
+// syncing, the peer is dropped.
+func (f *lightFetcher) checkSyncedHeaders(p *peer) {
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkSyncedHeaders: unknown peer")
+ return
+ }
+ n := fp.lastAnnounced
+ var td *big.Int
+ for n != nil {
+ if td = f.chain.GetTd(n.hash, n.number); td != nil {
+ break
+ }
+ n = n.parent
+ }
+ // now n is the latest downloaded header after syncing
+ if n == nil {
+ glog.V(logger.Debug).Infof("synchronisation failed with peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ } else {
+ header := f.chain.GetHeader(n.hash, n.number)
+ f.newHeaders([]*types.Header{header}, []*big.Int{td})
+ }
+}
- srtoNotify := false
+// checkKnownNode checks if a block tree node is known (downloaded and validated)
+// If it was not known previously but found in the database, sets its known flag
+func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
+ if n.known {
+ return true
+ }
+ td := f.chain.GetTd(n.hash, n.number)
+ if td == nil {
+ return false
+ }
+
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkKnownNode: unknown peer")
+ return false
+ }
+ header := f.chain.GetHeader(n.hash, n.number)
+ if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
+ glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id)
+ go f.pm.removePeer(p.id)
+ }
+ if fp.confirmedTd != nil {
+ f.updateMaxConfirmedTd(fp.confirmedTd)
+ }
+ return n.known
+}
+
+// deleteNode deletes a node and its child subtrees from a peer's block tree
+func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
+ if n.parent != nil {
+ for i, nn := range n.parent.children {
+ if nn == n {
+ n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
+ break
+ }
+ }
+ }
for {
- select {
- case <-f.pm.quitSync:
+ if n.td != nil {
+ delete(fp.nodeByHash, n.hash)
+ }
+ fp.nodeCnt--
+ if len(n.children) == 0 {
return
- case ext := <-f.notifyChn:
- //fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify)
- s := srtoNotify
- srtoNotify = false
- if !f.syncing && !(ext && s) {
- if p, r := f.nextRequest(); r != nil {
- srtoNotify = true
- go func() {
- time.Sleep(softRequestTimeout)
- f.notifyChn <- false
- }()
- f.request(p, r)
- }
+ }
+ for i, nn := range n.children {
+ if i == 0 {
+ n = nn
+ } else {
+ fp.deleteNode(nn)
}
- case reqID := <-f.timeoutChn:
- f.reqMu.Lock()
- req, ok := f.requested[reqID]
- if ok {
- delete(f.requested, reqID)
- }
- f.reqMu.Unlock()
- if ok {
- //fmt.Println("hard timeout")
- f.pm.removePeer(req.peer.id)
- }
- case resp := <-f.deliverChn:
- //fmt.Println("<-f.deliverChn", f.syncing)
- f.reqMu.Lock()
- req, ok := f.requested[resp.reqID]
- delete(f.requested, resp.reqID)
- f.reqMu.Unlock()
- if !ok || !(f.syncing || f.processResponse(req, resp)) {
- //fmt.Println("processResponse fail")
- f.pm.removePeer(req.peer.id)
- }
- case <-f.syncDone:
- //fmt.Println("<-f.syncDone", f.syncing)
- f.checkSyncedHeaders()
- f.syncing = false
+ }
+ }
+}
+
+// updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
+// than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
+// and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
+// both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
+// and it has also been downloaded from any peer, either before or after the given announcement).
+// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
+// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
+// the current global head).
+type updateStatsEntry struct {
+ time mclock.AbsTime
+ td *big.Int
+ next *updateStatsEntry
+}
+
+// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
+// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
+// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
+// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
+// positive block delay value.
+func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
+ if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
+ f.maxConfirmedTd = td
+ newEntry := &updateStatsEntry{
+ time: mclock.Now(),
+ td: td,
+ }
+ if f.lastUpdateStats != nil {
+ f.lastUpdateStats.next = newEntry
+ }
+ f.lastUpdateStats = newEntry
+ for p, _ := range f.peers {
+ f.checkUpdateStats(p, newEntry)
+ }
+ }
+}
+
+// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
+// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
+// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
+// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
+// items are removed from the head of the linked list.
+// If a new entry has been added to the global tail, it is passed as a parameter here even though this function
+// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
+// it can set the new head to newEntry.
+func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
+ now := mclock.Now()
+ fp := f.peers[p]
+ if fp == nil {
+ glog.V(logger.Debug).Infof("checkUpdateStats: unknown peer")
+ return
+ }
+ if newEntry != nil && fp.firstUpdateStats == nil {
+ fp.firstUpdateStats = newEntry
+ }
+ for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
+ f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
+ fp.firstUpdateStats = fp.firstUpdateStats.next
+ }
+ if fp.confirmedTd != nil {
+ for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
+ f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
+ fp.firstUpdateStats = fp.firstUpdateStats.next
}
}
}
diff --git a/les/handler.go b/les/handler.go
index fdf4e6e8a..b024841f2 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -22,8 +22,8 @@ import (
"errors"
"fmt"
"math/big"
+ "net"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -58,7 +58,7 @@ const (
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
MaxTxSend = 64 // Amount of transactions to be send per request
- disableClientRemovePeer = true
+ disableClientRemovePeer = false
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -101,10 +101,7 @@ type ProtocolManager struct {
chainDb ethdb.Database
odr *LesOdr
server *LesServer
-
- topicDisc *discv5.Network
- lesTopic discv5.Topic
- p2pServer *p2p.Server
+ serverPool *serverPool
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -157,13 +154,29 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ var entry *poolEntry
peer := manager.newPeer(int(version), networkId, p, rw)
+ if manager.serverPool != nil {
+ addr := p.RemoteAddr().(*net.TCPAddr)
+ entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
+ if entry == nil {
+ return fmt.Errorf("unwanted connection")
+ }
+ }
+ peer.poolEntry = entry
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
- return manager.handle(peer)
+ err := manager.handle(peer)
+ if entry != nil {
+ manager.serverPool.disconnect(entry)
+ }
+ return err
case <-manager.quitSync:
+ if entry != nil {
+ manager.serverPool.disconnect(entry)
+ }
return p2p.DiscQuitting
}
},
@@ -192,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
- manager.fetcher = newLightFetcher(manager)
}
if odr != nil {
@@ -222,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
if pm.lightSync {
pm.downloader.UnregisterPeer(id)
- pm.odr.UnregisterPeer(peer)
if pm.txrelay != nil {
pm.txrelay.removePeer(id)
}
+ if pm.fetcher != nil {
+ pm.fetcher.removePeer(peer)
+ }
}
if err := pm.peers.Unregister(id); err != nil {
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -236,54 +250,26 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) findServers() {
- if pm.p2pServer == nil || pm.topicDisc == nil {
- return
- }
- glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
- enodes := make(chan string, 100)
- stop := make(chan struct{})
- go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
- go func() {
- added := make(map[string]bool)
- for {
- select {
- case enode := <-enodes:
- if !added[enode] {
- glog.V(logger.Info).Infoln("Found LES server:", enode)
- added[enode] = true
- if node, err := discover.ParseNode(enode); err == nil {
- pm.p2pServer.AddPeer(node)
- }
- }
- case <-stop:
- return
- }
- }
- }()
- select {
- case <-time.After(time.Second * 20):
- case <-pm.quitSync:
- }
- close(stop)
-}
-
func (pm *ProtocolManager) Start(srvr *p2p.Server) {
- pm.p2pServer = srvr
+ var topicDisc *discv5.Network
if srvr != nil {
- pm.topicDisc = srvr.DiscV5
+ topicDisc = srvr.DiscV5
}
- pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
+ lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
if pm.lightSync {
// start sync handler
- go pm.findServers()
+ if srvr != nil { // srvr is nil during testing
+ pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
+ pm.odr.serverPool = pm.serverPool
+ pm.fetcher = newLightFetcher(pm)
+ }
go pm.syncer()
} else {
- if pm.topicDisc != nil {
+ if topicDisc != nil {
go func() {
- glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
- pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
- glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
+ glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic))
+ topicDisc.RegisterTopic(lesTopic, pm.quitSync)
+ glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic))
}()
}
go func() {
@@ -352,13 +338,13 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("LES: register peer %v", p.id)
if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
- reqID := pm.odr.getNextReqID()
+ reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
- reqID := pm.odr.getNextReqID()
+ reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
@@ -367,12 +353,21 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
return err
}
- pm.odr.RegisterPeer(p)
if pm.txrelay != nil {
pm.txrelay.addPeer(p)
}
- pm.fetcher.notify(p, nil)
+ p.lock.Lock()
+ head := p.headInfo
+ p.lock.Unlock()
+ if pm.fetcher != nil {
+ pm.fetcher.addPeer(p)
+ pm.fetcher.announce(p, head)
+ }
+
+ if p.poolEntry != nil {
+ pm.serverPool.registered(p.poolEntry)
+ }
}
stop := make(chan struct{})
@@ -454,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "%v: %v", msg, err)
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
- pm.fetcher.notify(p, &req)
+ if pm.fetcher != nil {
+ go pm.fetcher.announce(p, &req)
+ }
case GetBlockHeadersMsg:
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -552,8 +549,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.GotReply(resp.ReqID, resp.BV)
- if pm.fetcher.requestedID(resp.ReqID) {
- pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers)
+ if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
+ pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)
if err != nil {
diff --git a/les/helper_test.go b/les/helper_test.go
index a5428e954..2df3faab2 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -25,6 +25,7 @@ import (
"math/big"
"sync"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
func (p *testPeer) close() {
p.app.Close()
}
+
+type testServerPool peer
+
+func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
+ return (*peer)(p)
+}
+
+func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
+
+}
diff --git a/les/odr.go b/les/odr.go
index 444b1da2a..8878508c4 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -17,6 +17,8 @@
package les
import (
+ "crypto/rand"
+ "encoding/binary"
"sync"
"time"
@@ -37,6 +39,11 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+type odrPeerSelector interface {
+ selectPeer(func(*peer) (bool, uint64)) *peer
+ adjustResponseTime(*poolEntry, time.Duration, bool)
+}
+
type LesOdr struct {
light.OdrBackend
db ethdb.Database
@@ -44,15 +51,13 @@ type LesOdr struct {
removePeer peerDropFn
mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq
- peers *odrPeerSet
- lastReqID uint64
+ serverPool odrPeerSelector
}
func NewLesOdr(db ethdb.Database) *LesOdr {
return &LesOdr{
db: db,
stop: make(chan struct{}),
- peers: newOdrPeerSet(),
sentReqs: make(map[uint64]*sentReq),
}
}
@@ -77,16 +82,6 @@ type sentReq struct {
answered chan struct{} // closed and set to nil when any peer answers it
}
-// RegisterPeer registers a new LES peer to the ODR capable peer set
-func (self *LesOdr) RegisterPeer(p *peer) error {
- return self.peers.register(p)
-}
-
-// UnregisterPeer removes a peer from the ODR capable peer set
-func (self *LesOdr) UnregisterPeer(p *peer) {
- self.peers.unregister(p)
-}
-
const (
MsgBlockBodies = iota
MsgCode
@@ -142,29 +137,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateTimeout(peer, false)
- self.peers.updateServTime(peer, servTime)
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
+ }
return
case <-time.After(softRequestTimeout):
close(timeout)
- if self.peers.updateTimeout(peer, true) {
- self.removePeer(peer.id)
- }
case <-self.stop:
return
}
select {
case <-delivered:
- servTime := uint64(mclock.Now() - stime)
- self.peers.updateServTime(peer, servTime)
- return
case <-time.After(hardRequestTimeout):
- self.removePeer(peer.id)
+ go self.removePeer(peer.id)
case <-self.stop:
return
}
+ if self.serverPool != nil {
+ self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
+ }
}
// networkRequest sends a request to known peers until an answer is received
@@ -176,7 +168,7 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
sentTo: make(map[*peer]chan struct{}),
answered: answered, // reply delivered by any peer
}
- reqID := self.getNextReqID()
+ reqID := getNextReqID()
self.mlock.Lock()
self.sentReqs[reqID] = req
self.mlock.Unlock()
@@ -193,7 +185,16 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
exclude := make(map[*peer]struct{})
for {
- if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
+ var p *peer
+ if self.serverPool != nil {
+ p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ if !lreq.CanSend(p) {
+ return false, 0
+ }
+ return true, p.fcServer.CanSend(lreq.GetCost(p))
+ })
+ }
+ if p == nil {
select {
case <-ctx.Done():
return ctx.Err()
@@ -202,17 +203,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
case <-time.After(retryPeers):
}
} else {
- exclude[peer] = struct{}{}
+ exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
- req.sentTo[peer] = delivered
+ req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
- cost := lreq.GetCost(peer)
- peer.fcServer.SendRequest(reqID, cost)
- go self.requestPeer(req, peer, delivered, timeout, reqWg)
- lreq.Request(reqID, peer)
+ cost := lreq.GetCost(p)
+ p.fcServer.SendRequest(reqID, cost)
+ go self.requestPeer(req, p, delivered, timeout, reqWg)
+ lreq.Request(reqID, p)
select {
case <-ctx.Done():
@@ -239,10 +240,8 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err
return
}
-func (self *LesOdr) getNextReqID() uint64 {
- self.clock.Lock()
- defer self.clock.Unlock()
-
- self.lastReqID++
- return self.lastReqID
+func getNextReqID() uint64 {
+ var rnd [8]byte
+ rand.Read(rnd[:])
+ return binary.BigEndian.Uint64(rnd[:])
}
diff --git a/les/odr_peerset.go b/les/odr_peerset.go
deleted file mode 100644
index e9b7eec7f..000000000
--- a/les/odr_peerset.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package les
-
-import (
- "sync"
-)
-
-const dropTimeoutRatio = 20
-
-type odrPeerInfo struct {
- reqTimeSum, reqTimeCnt, reqCnt, timeoutCnt uint64
-}
-
-// odrPeerSet represents the collection of active peer participating in the block
-// download procedure.
-type odrPeerSet struct {
- peers map[*peer]*odrPeerInfo
- lock sync.RWMutex
-}
-
-// newPeerSet creates a new peer set top track the active download sources.
-func newOdrPeerSet() *odrPeerSet {
- return &odrPeerSet{
- peers: make(map[*peer]*odrPeerInfo),
- }
-}
-
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *odrPeerSet) register(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[p]; ok {
- return errAlreadyRegistered
- }
- ps.peers[p] = &odrPeerInfo{}
- return nil
-}
-
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *odrPeerSet) unregister(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[p]; !ok {
- return errNotRegistered
- }
- delete(ps.peers, p)
- return nil
-}
-
-func (ps *odrPeerSet) peerPriority(p *peer, info *odrPeerInfo, req LesOdrRequest) uint64 {
- tm := p.fcServer.CanSend(req.GetCost(p))
- if info.reqTimeCnt > 0 {
- tm += info.reqTimeSum / info.reqTimeCnt
- }
- return tm
-}
-
-func (ps *odrPeerSet) bestPeer(req LesOdrRequest, exclude map[*peer]struct{}) *peer {
- var best *peer
- var bpv uint64
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- for p, info := range ps.peers {
- if _, ok := exclude[p]; !ok {
- pv := ps.peerPriority(p, info, req)
- if best == nil || pv < bpv {
- best = p
- bpv = pv
- }
- }
- }
- return best
-}
-
-func (ps *odrPeerSet) updateTimeout(p *peer, timeout bool) (drop bool) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if info, ok := ps.peers[p]; ok {
- info.reqCnt++
- if timeout {
- // check ratio before increase to allow an extra timeout
- if info.timeoutCnt*dropTimeoutRatio >= info.reqCnt {
- return true
- }
- info.timeoutCnt++
- }
- }
- return false
-}
-
-func (ps *odrPeerSet) updateServTime(p *peer, servTime uint64) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if info, ok := ps.peers[p]; ok {
- info.reqTimeSum += servTime
- info.reqTimeCnt++
- }
-}
diff --git a/les/odr_requests.go b/les/odr_requests.go
index f4bd51888..a4fbd79f6 100644
--- a/les/odr_requests.go
+++ b/les/odr_requests.go
@@ -36,6 +36,7 @@ import (
type LesOdrRequest interface {
GetCost(*peer) uint64
+ CanSend(*peer) bool
Request(uint64, *peer) error
Valid(ethdb.Database, *Msg) bool // if true, keeps the retrieved object
}
@@ -66,6 +67,11 @@ func (self *BlockRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetBlockBodiesMsg, 1)
}
+// CanSend tells if a certain peer is suitable for serving the given request
+func (self *BlockRequest) CanSend(peer *peer) bool {
+ return peer.HasBlock(self.Hash, self.Number)
+}
+
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (self *BlockRequest) Request(reqID uint64, peer *peer) error {
glog.V(logger.Debug).Infof("ODR: requesting body of block %08x from peer %v", self.Hash[:4], peer.id)
@@ -121,6 +127,11 @@ func (self *ReceiptsRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetReceiptsMsg, 1)
}
+// CanSend tells if a certain peer is suitable for serving the given request
+func (self *ReceiptsRequest) CanSend(peer *peer) bool {
+ return peer.HasBlock(self.Hash, self.Number)
+}
+
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (self *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
glog.V(logger.Debug).Infof("ODR: requesting receipts for block %08x from peer %v", self.Hash[:4], peer.id)
@@ -171,6 +182,11 @@ func (self *TrieRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetProofsMsg, 1)
}
+// CanSend tells if a certain peer is suitable for serving the given request
+func (self *TrieRequest) CanSend(peer *peer) bool {
+ return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber)
+}
+
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (self *TrieRequest) Request(reqID uint64, peer *peer) error {
glog.V(logger.Debug).Infof("ODR: requesting trie root %08x key %08x from peer %v", self.Id.Root[:4], self.Key[:4], peer.id)
@@ -221,6 +237,11 @@ func (self *CodeRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetCodeMsg, 1)
}
+// CanSend tells if a certain peer is suitable for serving the given request
+func (self *CodeRequest) CanSend(peer *peer) bool {
+ return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber)
+}
+
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (self *CodeRequest) Request(reqID uint64, peer *peer) error {
glog.V(logger.Debug).Infof("ODR: requesting node data for hash %08x from peer %v", self.Hash[:4], peer.id)
@@ -274,6 +295,14 @@ func (self *ChtRequest) GetCost(peer *peer) uint64 {
return peer.GetRequestCost(GetHeaderProofsMsg, 1)
}
+// CanSend tells if a certain peer is suitable for serving the given request
+func (self *ChtRequest) CanSend(peer *peer) bool {
+ peer.lock.RLock()
+ defer peer.lock.RUnlock()
+
+ return self.ChtNum <= (peer.headInfo.Number-light.ChtConfirmations)/light.ChtFrequency
+}
+
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
func (self *ChtRequest) Request(reqID uint64, peer *peer) error {
glog.V(logger.Debug).Infof("ODR: requesting CHT #%d block #%d from peer %v", self.ChtNum, self.BlockNum, peer.id)
diff --git a/les/odr_test.go b/les/odr_test.go
index 27e3b283d..685435996 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -160,6 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
+ pool := (*testServerPool)(lpeer)
+ odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -188,13 +190,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
}
// temporarily remove peer to test odr fails
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer
test(expFail)
- odr.RegisterPeer(lpeer)
+ odr.serverPool = pool
// expect all retrievals to pass
test(5)
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// still expect all retrievals to pass, now data should be cached locally
test(5)
}
diff --git a/les/peer.go b/les/peer.go
index 5d566d899..0a8db4975 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -51,12 +51,14 @@ type peer struct {
id string
- firstHeadInfo, headInfo *announceData
- headInfoLen int
- lock sync.RWMutex
+ headInfo *announceData
+ lock sync.RWMutex
announceChn chan announceData
+ poolEntry *poolEntry
+ hasBlock func(common.Hash, uint64) bool
+
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcServerParams *flowcontrol.ServerParams
@@ -109,67 +111,6 @@ func (p *peer) headBlockInfo() blockInfo {
return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}
}
-func (p *peer) addNotify(announce *announceData) bool {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- if announce.Td.Cmp(p.headInfo.Td) < 1 {
- return false
- }
- if p.headInfoLen >= maxHeadInfoLen {
- //return false
- p.firstHeadInfo = p.firstHeadInfo.next
- p.headInfoLen--
- }
- if announce.haveHeaders == 0 {
- hh := p.headInfo.Number - announce.ReorgDepth
- if p.headInfo.haveHeaders < hh {
- hh = p.headInfo.haveHeaders
- }
- announce.haveHeaders = hh
- }
- p.headInfo.next = announce
- p.headInfo = announce
- p.headInfoLen++
- return true
-}
-
-func (p *peer) gotHeader(hash common.Hash, number uint64, td *big.Int) bool {
- h := p.firstHeadInfo
- ptr := 0
- for h != nil {
- if h.Hash == hash {
- if h.Number != number || h.Td.Cmp(td) != 0 {
- return false
- }
- h.headKnown = true
- h.haveHeaders = h.Number
- p.firstHeadInfo = h
- p.headInfoLen -= ptr
- last := h
- h = h.next
- // propagate haveHeaders through the chain
- for h != nil {
- hh := last.Number - h.ReorgDepth
- if last.haveHeaders < hh {
- hh = last.haveHeaders
- }
- if hh > h.haveHeaders {
- h.haveHeaders = hh
- } else {
- return true
- }
- last = h
- h = h.next
- }
- return true
- }
- h = h.next
- ptr++
- }
- return true
-}
-
// Td retrieves the current total difficulty of a peer.
func (p *peer) Td() *big.Int {
p.lock.RLock()
@@ -195,6 +136,9 @@ func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{})
}
func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount)
if cost > p.fcServerParams.BufLimit {
cost = p.fcServerParams.BufLimit
@@ -202,6 +146,14 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 {
return cost
}
+// HasBlock checks if the peer has a given block
+func (p *peer) HasBlock(hash common.Hash, number uint64) bool {
+ p.lock.RLock()
+ hashBlock := p.hasBlock
+ p.lock.RUnlock()
+ return hashBlock != nil && hashBlock(hash, number)
+}
+
// SendAnnounce announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendAnnounce(request announceData) error {
@@ -453,9 +405,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis
p.fcCosts = MRC.decode()
}
- p.firstHeadInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
- p.headInfo = p.firstHeadInfo
- p.headInfoLen = 1
+ p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum}
return nil
}
diff --git a/les/randselect.go b/les/randselect.go
new file mode 100644
index 000000000..1a9d0695b
--- /dev/null
+++ b/les/randselect.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package les implements the Light Ethereum Subprotocol.
+package les
+
+import (
+ "math/rand"
+)
+
+// wrsItem interface should be implemented by any entries that are to be selected from
+// a weightedRandomSelect set. Note that recalculating monotonously decreasing item
+// weights on-demand (without constantly calling update) is allowed
+type wrsItem interface {
+ Weight() int64
+}
+
+// weightedRandomSelect is capable of weighted random selection from a set of items
+type weightedRandomSelect struct {
+ root *wrsNode
+ idx map[wrsItem]int
+}
+
+// newWeightedRandomSelect returns a new weightedRandomSelect structure
+func newWeightedRandomSelect() *weightedRandomSelect {
+ return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)}
+}
+
+// update updates an item's weight, adds it if it was non-existent or removes it if
+// the new weight is zero. Note that explicitly updating decreasing weights is not necessary.
+func (w *weightedRandomSelect) update(item wrsItem) {
+ w.setWeight(item, item.Weight())
+}
+
+// remove removes an item from the set
+func (w *weightedRandomSelect) remove(item wrsItem) {
+ w.setWeight(item, 0)
+}
+
+// setWeight sets an item's weight to a specific value (removes it if zero)
+func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) {
+ idx, ok := w.idx[item]
+ if ok {
+ w.root.setWeight(idx, weight)
+ if weight == 0 {
+ delete(w.idx, item)
+ }
+ } else {
+ if weight != 0 {
+ if w.root.itemCnt == w.root.maxItems {
+ // add a new level
+ newRoot := &wrsNode{sumWeight: w.root.sumWeight, itemCnt: w.root.itemCnt, level: w.root.level + 1, maxItems: w.root.maxItems * wrsBranches}
+ newRoot.items[0] = w.root
+ newRoot.weights[0] = w.root.sumWeight
+ w.root = newRoot
+ }
+ w.idx[item] = w.root.insert(item, weight)
+ }
+ }
+}
+
+// choose randomly selects an item from the set, with a chance proportional to its
+// current weight. If the weight of the chosen element has been decreased since the
+// last stored value, returns it with a newWeight/oldWeight chance, otherwise just
+// updates its weight and selects another one
+func (w *weightedRandomSelect) choose() wrsItem {
+ for {
+ if w.root.sumWeight == 0 {
+ return nil
+ }
+ val := rand.Int63n(w.root.sumWeight)
+ choice, lastWeight := w.root.choose(val)
+ weight := choice.Weight()
+ if weight != lastWeight {
+ w.setWeight(choice, weight)
+ }
+ if weight >= lastWeight || rand.Int63n(lastWeight) < weight {
+ return choice
+ }
+ }
+}
+
+const wrsBranches = 8 // max number of branches in the wrsNode tree
+
+// wrsNode is a node of a tree structure that can store wrsItems or further wrsNodes.
+type wrsNode struct {
+ items [wrsBranches]interface{}
+ weights [wrsBranches]int64
+ sumWeight int64
+ level, itemCnt, maxItems int
+}
+
+// insert recursively inserts a new item to the tree and returns the item index
+func (n *wrsNode) insert(item wrsItem, weight int64) int {
+ branch := 0
+ for n.items[branch] != nil && (n.level == 0 || n.items[branch].(*wrsNode).itemCnt == n.items[branch].(*wrsNode).maxItems) {
+ branch++
+ if branch == wrsBranches {
+ panic(nil)
+ }
+ }
+ n.itemCnt++
+ n.sumWeight += weight
+ n.weights[branch] += weight
+ if n.level == 0 {
+ n.items[branch] = item
+ return branch
+ } else {
+ var subNode *wrsNode
+ if n.items[branch] == nil {
+ subNode = &wrsNode{maxItems: n.maxItems / wrsBranches, level: n.level - 1}
+ n.items[branch] = subNode
+ } else {
+ subNode = n.items[branch].(*wrsNode)
+ }
+ subIdx := subNode.insert(item, weight)
+ return subNode.maxItems*branch + subIdx
+ }
+}
+
+// setWeight updates the weight of a certain item (which should exist) and returns
+// the change of the last weight value stored in the tree
+func (n *wrsNode) setWeight(idx int, weight int64) int64 {
+ if n.level == 0 {
+ oldWeight := n.weights[idx]
+ n.weights[idx] = weight
+ diff := weight - oldWeight
+ n.sumWeight += diff
+ if weight == 0 {
+ n.items[idx] = nil
+ n.itemCnt--
+ }
+ return diff
+ }
+ branchItems := n.maxItems / wrsBranches
+ branch := idx / branchItems
+ diff := n.items[branch].(*wrsNode).setWeight(idx-branch*branchItems, weight)
+ n.weights[branch] += diff
+ n.sumWeight += diff
+ if weight == 0 {
+ n.itemCnt--
+ }
+ return diff
+}
+
+// choose recursively selects an item from the tree and returns it along with its weight
+func (n *wrsNode) choose(val int64) (wrsItem, int64) {
+ for i, w := range n.weights {
+ if val < w {
+ if n.level == 0 {
+ return n.items[i].(wrsItem), n.weights[i]
+ } else {
+ return n.items[i].(*wrsNode).choose(val)
+ }
+ } else {
+ val -= w
+ }
+ }
+ panic(nil)
+}
diff --git a/les/randselect_test.go b/les/randselect_test.go
new file mode 100644
index 000000000..f3c34305e
--- /dev/null
+++ b/les/randselect_test.go
@@ -0,0 +1,67 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package les
+
+import (
+ "math/rand"
+ "testing"
+)
+
+type testWrsItem struct {
+ idx int
+ widx *int
+}
+
+func (t *testWrsItem) Weight() int64 {
+ w := *t.widx
+ if w == -1 || w == t.idx {
+ return int64(t.idx + 1)
+ }
+ return 0
+}
+
+func TestWeightedRandomSelect(t *testing.T) {
+ testFn := func(cnt int) {
+ s := newWeightedRandomSelect()
+ w := -1
+ list := make([]testWrsItem, cnt)
+ for i, _ := range list {
+ list[i] = testWrsItem{idx: i, widx: &w}
+ s.update(&list[i])
+ }
+ w = rand.Intn(cnt)
+ c := s.choose()
+ if c == nil {
+ t.Errorf("expected item, got nil")
+ } else {
+ if c.(*testWrsItem).idx != w {
+ t.Errorf("expected another item")
+ }
+ }
+ w = -2
+ if s.choose() != nil {
+ t.Errorf("expected nil, got item")
+ }
+ }
+ testFn(1)
+ testFn(10)
+ testFn(100)
+ testFn(1000)
+ testFn(10000)
+ testFn(100000)
+ testFn(1000000)
+}
diff --git a/les/request_test.go b/les/request_test.go
index a6fbb06ce..03b946771 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -71,6 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
+ pool := (*testServerPool)(lpeer)
+ odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
@@ -100,11 +102,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
}
// temporarily remove peer to test odr fails
- odr.UnregisterPeer(lpeer)
+ odr.serverPool = nil
// expect retrievals to fail (except genesis block) without a les peer
test(0)
- odr.RegisterPeer(lpeer)
+ odr.serverPool = pool
// expect all retrievals to pass
test(5)
- odr.UnregisterPeer(lpeer)
}
diff --git a/les/server.go b/les/server.go
index c763e8c63..e55616a44 100644
--- a/les/server.go
+++ b/les/server.go
@@ -42,6 +42,9 @@ type LesServer struct {
fcManager *flowcontrol.ClientManager // nil if our node is client only
fcCostStats *requestCostStats
defParams *flowcontrol.ServerParams
+ srvr *p2p.Server
+ synced, stopped bool
+ lock sync.Mutex
}
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
@@ -67,12 +70,35 @@ func (s *LesServer) Protocols() []p2p.Protocol {
return s.protocolManager.SubProtocols
}
+// Start only starts the actual service if the ETH protocol has already been synced,
+// otherwise it will be started by Synced()
func (s *LesServer) Start(srvr *p2p.Server) {
- s.protocolManager.Start(srvr)
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ s.srvr = srvr
+ if s.synced {
+ s.protocolManager.Start(s.srvr)
+ }
}
+// Synced notifies the server that the ETH protocol has been synced and LES service can be started
+func (s *LesServer) Synced() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.synced = true
+ if s.srvr != nil && !s.stopped {
+ s.protocolManager.Start(s.srvr)
+ }
+}
+
+// Stop stops the LES service
func (s *LesServer) Stop() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.stopped = true
s.fcCostStats.store()
s.fcManager.Stop()
go func() {
@@ -323,9 +349,8 @@ func (pm *ProtocolManager) blockLoop() {
}
var (
- lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian)
- chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
- chtConfirmations = light.ChtFrequency / 2
+ lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian)
+ chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
)
func getChtRoot(db ethdb.Database, num uint64) common.Hash {
@@ -346,8 +371,8 @@ func makeCht(db ethdb.Database) bool {
headNum := core.GetBlockNumber(db, headHash)
var newChtNum uint64
- if headNum > chtConfirmations {
- newChtNum = (headNum - chtConfirmations) / light.ChtFrequency
+ if headNum > light.ChtConfirmations {
+ newChtNum = (headNum - light.ChtConfirmations) / light.ChtFrequency
}
var lastChtNum uint64
diff --git a/les/serverpool.go b/les/serverpool.go
new file mode 100644
index 000000000..f5e880460
--- /dev/null
+++ b/les/serverpool.go
@@ -0,0 +1,766 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package les implements the Light Ethereum Subprotocol.
+package les
+
+import (
+ "io"
+ "math"
+ "math/rand"
+ "net"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/discv5"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ // After a connection has been ended or timed out, there is a waiting period
+ // before it can be selected for connection again.
+ // waiting period = base delay * (1 + random(1))
+ // base delay = shortRetryDelay for the first shortRetryCnt times after a
+ // successful connection, after that longRetryDelay is applied
+ shortRetryCnt = 5
+ shortRetryDelay = time.Second * 5
+ longRetryDelay = time.Minute * 10
+ // maxNewEntries is the maximum number of newly discovered (never connected) nodes.
+ // If the limit is reached, the least recently discovered one is thrown out.
+ maxNewEntries = 1000
+ // maxKnownEntries is the maximum number of known (already connected) nodes.
+ // If the limit is reached, the least recently connected one is thrown out.
+ // (not that unlike new entries, known entries are persistent)
+ maxKnownEntries = 1000
+ // target for simultaneously connected servers
+ targetServerCount = 5
+ // target for servers selected from the known table
+ // (we leave room for trying new ones if there is any)
+ targetKnownSelect = 3
+ // after dialTimeout, consider the server unavailable and adjust statistics
+ dialTimeout = time.Second * 30
+ // targetConnTime is the minimum expected connection duration before a server
+ // drops a client without any specific reason
+ targetConnTime = time.Minute * 10
+ // new entry selection weight calculation based on most recent discovery time:
+ // unity until discoverExpireStart, then exponential decay with discoverExpireConst
+ discoverExpireStart = time.Minute * 20
+ discoverExpireConst = time.Minute * 20
+ // known entry selection weight is dropped by a factor of exp(-failDropLn) after
+ // each unsuccessful connection (restored after a successful one)
+ failDropLn = 0.1
+ // known node connection success and quality statistics have a long term average
+ // and a short term value which is adjusted exponentially with a factor of
+ // pstatRecentAdjust with each dial/connection and also returned exponentially
+ // to the average with the time constant pstatReturnToMeanTC
+ pstatRecentAdjust = 0.1
+ pstatReturnToMeanTC = time.Hour
+ // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after
+ // each unsuccessful connection (restored after a successful one)
+ addrFailDropLn = math.Ln2
+ // responseScoreTC and delayScoreTC are exponential decay time constants for
+ // calculating selection chances from response times and block delay times
+ responseScoreTC = time.Millisecond * 100
+ delayScoreTC = time.Second * 5
+ timeoutPow = 10
+ // peerSelectMinWeight is added to calculated weights at request peer selection
+ // to give poorly performing peers a little chance of coming back
+ peerSelectMinWeight = 0.005
+ // initStatsWeight is used to initialize previously unknown peers with good
+ // statistics to give a chance to prove themselves
+ initStatsWeight = 1
+)
+
+// serverPool implements a pool for storing and selecting newly discovered and already
+// known light server nodes. It received discovered nodes, stores statistics about
+// known nodes and takes care of always having enough good quality servers connected.
+type serverPool struct {
+ db ethdb.Database
+ dbKey []byte
+ server *p2p.Server
+ quit chan struct{}
+ wg *sync.WaitGroup
+ connWg sync.WaitGroup
+
+ discSetPeriod chan time.Duration
+ discNodes chan *discv5.Node
+ discLookups chan bool
+
+ entries map[discover.NodeID]*poolEntry
+ lock sync.Mutex
+ timeout, enableRetry chan *poolEntry
+ adjustStats chan poolStatAdjust
+
+ knownQueue, newQueue poolEntryQueue
+ knownSelect, newSelect *weightedRandomSelect
+ knownSelected, newSelected int
+ fastDiscover bool
+}
+
+// newServerPool creates a new serverPool instance
+func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool {
+ pool := &serverPool{
+ db: db,
+ dbKey: append(dbPrefix, []byte(topic)...),
+ server: server,
+ quit: quit,
+ wg: wg,
+ entries: make(map[discover.NodeID]*poolEntry),
+ timeout: make(chan *poolEntry, 1),
+ adjustStats: make(chan poolStatAdjust, 100),
+ enableRetry: make(chan *poolEntry, 1),
+ knownSelect: newWeightedRandomSelect(),
+ newSelect: newWeightedRandomSelect(),
+ fastDiscover: true,
+ }
+ pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry)
+ pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry)
+ wg.Add(1)
+ pool.loadNodes()
+ pool.checkDial()
+
+ if pool.server.DiscV5 != nil {
+ pool.discSetPeriod = make(chan time.Duration, 1)
+ pool.discNodes = make(chan *discv5.Node, 100)
+ pool.discLookups = make(chan bool, 100)
+ go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
+ }
+
+ go pool.eventLoop()
+ return pool
+}
+
+// connect should be called upon any incoming connection. If the connection has been
+// dialed by the server pool recently, the appropriate pool entry is returned.
+// Otherwise, the connection should be rejected.
+// Note that whenever a connection has been accepted and a pool entry has been returned,
+// disconnect should also always be called.
+func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+ entry := pool.entries[p.ID()]
+ if entry == nil {
+ return nil
+ }
+ glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
+ if entry.state != psDialed {
+ return nil
+ }
+ pool.connWg.Add(1)
+ entry.peer = p
+ entry.state = psConnected
+ addr := &poolEntryAddress{
+ ip: ip,
+ port: port,
+ lastSeen: mclock.Now(),
+ }
+ entry.lastConnected = addr
+ entry.addr = make(map[string]*poolEntryAddress)
+ entry.addr[addr.strKey()] = addr
+ entry.addrSelect = *newWeightedRandomSelect()
+ entry.addrSelect.update(addr)
+ return entry
+}
+
+// registered should be called after a successful handshake
+func (pool *serverPool) registered(entry *poolEntry) {
+ glog.V(logger.Debug).Infof("registered %v", entry.id.String())
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+
+ entry.state = psRegistered
+ entry.regTime = mclock.Now()
+ if !entry.known {
+ pool.newQueue.remove(entry)
+ entry.known = true
+ }
+ pool.knownQueue.setLatest(entry)
+ entry.shortRetry = shortRetryCnt
+}
+
+// disconnect should be called when ending a connection. Service quality statistics
+// can be updated optionally (not updated if no registration happened, in this case
+// only connection statistics are updated, just like in case of timeout)
+func (pool *serverPool) disconnect(entry *poolEntry) {
+ glog.V(logger.Debug).Infof("disconnected %v", entry.id.String())
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+
+ if entry.state == psRegistered {
+ connTime := mclock.Now() - entry.regTime
+ connAdjust := float64(connTime) / float64(targetConnTime)
+ if connAdjust > 1 {
+ connAdjust = 1
+ }
+ stopped := false
+ select {
+ case <-pool.quit:
+ stopped = true
+ default:
+ }
+ if stopped {
+ entry.connectStats.add(1, connAdjust)
+ } else {
+ entry.connectStats.add(connAdjust, 1)
+ }
+ }
+
+ entry.state = psNotConnected
+ if entry.knownSelected {
+ pool.knownSelected--
+ } else {
+ pool.newSelected--
+ }
+ pool.setRetryDial(entry)
+ pool.connWg.Done()
+}
+
+const (
+ pseBlockDelay = iota
+ pseResponseTime
+ pseResponseTimeout
+)
+
+// poolStatAdjust records are sent to adjust peer block delay/response time statistics
+type poolStatAdjust struct {
+ adjustType int
+ entry *poolEntry
+ time time.Duration
+}
+
+// adjustBlockDelay adjusts the block announce delay statistics of a node
+func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
+ pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
+}
+
+// adjustResponseTime adjusts the request response time statistics of a node
+func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
+ if timeout {
+ pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
+ } else {
+ pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time}
+ }
+}
+
+type selectPeerItem struct {
+ peer *peer
+ weight int64
+}
+
+func (sp selectPeerItem) Weight() int64 {
+ return sp.weight
+}
+
+// selectPeer selects a suitable peer for a request
+func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
+ pool.lock.Lock()
+ defer pool.lock.Unlock()
+
+ sel := newWeightedRandomSelect()
+ for _, entry := range pool.entries {
+ if entry.state == psRegistered {
+ p := entry.peer
+ ok, cost := canSend(p)
+ if ok {
+ w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
+ sel.update(selectPeerItem{peer: p, weight: w})
+ }
+ }
+ }
+ choice := sel.choose()
+ if choice == nil {
+ return nil
+ }
+ return choice.(selectPeerItem).peer
+}
+
+// eventLoop handles pool events and mutex locking for all internal functions
+func (pool *serverPool) eventLoop() {
+ lookupCnt := 0
+ var convTime mclock.AbsTime
+ pool.discSetPeriod <- time.Millisecond * 100
+ for {
+ select {
+ case entry := <-pool.timeout:
+ pool.lock.Lock()
+ if !entry.removed {
+ pool.checkDialTimeout(entry)
+ }
+ pool.lock.Unlock()
+
+ case entry := <-pool.enableRetry:
+ pool.lock.Lock()
+ if !entry.removed {
+ entry.delayedRetry = false
+ pool.updateCheckDial(entry)
+ }
+ pool.lock.Unlock()
+
+ case adj := <-pool.adjustStats:
+ pool.lock.Lock()
+ switch adj.adjustType {
+ case pseBlockDelay:
+ adj.entry.delayStats.add(float64(adj.time), 1)
+ case pseResponseTime:
+ adj.entry.responseStats.add(float64(adj.time), 1)
+ adj.entry.timeoutStats.add(0, 1)
+ case pseResponseTimeout:
+ adj.entry.timeoutStats.add(1, 1)
+ }
+ pool.lock.Unlock()
+
+ case node := <-pool.discNodes:
+ pool.lock.Lock()
+ now := mclock.Now()
+ id := discover.NodeID(node.ID)
+ entry := pool.entries[id]
+ if entry == nil {
+ glog.V(logger.Debug).Infof("discovered %v", node.String())
+ entry = &poolEntry{
+ id: id,
+ addr: make(map[string]*poolEntryAddress),
+ addrSelect: *newWeightedRandomSelect(),
+ shortRetry: shortRetryCnt,
+ }
+ pool.entries[id] = entry
+ // initialize previously unknown peers with good statistics to give a chance to prove themselves
+ entry.connectStats.add(1, initStatsWeight)
+ entry.delayStats.add(0, initStatsWeight)
+ entry.responseStats.add(0, initStatsWeight)
+ entry.timeoutStats.add(0, initStatsWeight)
+ }
+ entry.lastDiscovered = now
+ addr := &poolEntryAddress{
+ ip: node.IP,
+ port: node.TCP,
+ }
+ if a, ok := entry.addr[addr.strKey()]; ok {
+ addr = a
+ } else {
+ entry.addr[addr.strKey()] = addr
+ }
+ addr.lastSeen = now
+ entry.addrSelect.update(addr)
+ if !entry.known {
+ pool.newQueue.setLatest(entry)
+ }
+ pool.updateCheckDial(entry)
+ pool.lock.Unlock()
+
+ case conv := <-pool.discLookups:
+ if conv {
+ if lookupCnt == 0 {
+ convTime = mclock.Now()
+ }
+ lookupCnt++
+ if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
+ pool.fastDiscover = false
+ pool.discSetPeriod <- time.Minute
+ }
+ }
+
+ case <-pool.quit:
+ close(pool.discSetPeriod)
+ pool.connWg.Wait()
+ pool.saveNodes()
+ pool.wg.Done()
+ return
+
+ }
+ }
+}
+
+// loadNodes loads known nodes and their statistics from the database
+func (pool *serverPool) loadNodes() {
+ enc, err := pool.db.Get(pool.dbKey)
+ if err != nil {
+ return
+ }
+ var list []*poolEntry
+ err = rlp.DecodeBytes(enc, &list)
+ if err != nil {
+ glog.V(logger.Debug).Infof("node list decode error: %v", err)
+ return
+ }
+ for _, e := range list {
+ glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight)
+ pool.entries[e.id] = e
+ pool.knownQueue.setLatest(e)
+ pool.knownSelect.update((*knownEntry)(e))
+ }
+}
+
+// saveNodes saves known nodes and their statistics into the database. Nodes are
+// ordered from least to most recently connected.
+func (pool *serverPool) saveNodes() {
+ list := make([]*poolEntry, len(pool.knownQueue.queue))
+ for i, _ := range list {
+ list[i] = pool.knownQueue.fetchOldest()
+ }
+ enc, err := rlp.EncodeToBytes(list)
+ if err == nil {
+ pool.db.Put(pool.dbKey, enc)
+ }
+}
+
+// removeEntry removes a pool entry when the entry count limit is reached.
+// Note that it is called by the new/known queues from which the entry has already
+// been removed so removing it from the queues is not necessary.
+func (pool *serverPool) removeEntry(entry *poolEntry) {
+ pool.newSelect.remove((*discoveredEntry)(entry))
+ pool.knownSelect.remove((*knownEntry)(entry))
+ entry.removed = true
+ delete(pool.entries, entry.id)
+}
+
+// setRetryDial starts the timer which will enable dialing a certain node again
+func (pool *serverPool) setRetryDial(entry *poolEntry) {
+ delay := longRetryDelay
+ if entry.shortRetry > 0 {
+ entry.shortRetry--
+ delay = shortRetryDelay
+ }
+ delay += time.Duration(rand.Int63n(int64(delay) + 1))
+ entry.delayedRetry = true
+ go func() {
+ select {
+ case <-pool.quit:
+ case <-time.After(delay):
+ select {
+ case <-pool.quit:
+ case pool.enableRetry <- entry:
+ }
+ }
+ }()
+}
+
+// updateCheckDial is called when an entry can potentially be dialed again. It updates
+// its selection weights and checks if new dials can/should be made.
+func (pool *serverPool) updateCheckDial(entry *poolEntry) {
+ pool.newSelect.update((*discoveredEntry)(entry))
+ pool.knownSelect.update((*knownEntry)(entry))
+ pool.checkDial()
+}
+
+// checkDial checks if new dials can/should be made. It tries to select servers both
+// based on good statistics and recent discovery.
+func (pool *serverPool) checkDial() {
+ fillWithKnownSelects := !pool.fastDiscover
+ for pool.knownSelected < targetKnownSelect {
+ entry := pool.knownSelect.choose()
+ if entry == nil {
+ fillWithKnownSelects = false
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*knownEntry)), true)
+ }
+ for pool.knownSelected+pool.newSelected < targetServerCount {
+ entry := pool.newSelect.choose()
+ if entry == nil {
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*discoveredEntry)), false)
+ }
+ if fillWithKnownSelects {
+ // no more newly discovered nodes to select and since fast discover period
+ // is over, we probably won't find more in the near future so select more
+ // known entries if possible
+ for pool.knownSelected < targetServerCount {
+ entry := pool.knownSelect.choose()
+ if entry == nil {
+ break
+ }
+ pool.dial((*poolEntry)(entry.(*knownEntry)), true)
+ }
+ }
+}
+
+// dial initiates a new connection
+func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
+ if entry.state != psNotConnected {
+ return
+ }
+ entry.state = psDialed
+ entry.knownSelected = knownSelected
+ if knownSelected {
+ pool.knownSelected++
+ } else {
+ pool.newSelected++
+ }
+ addr := entry.addrSelect.choose().(*poolEntryAddress)
+ glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected)
+ entry.dialed = addr
+ go func() {
+ pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
+ select {
+ case <-pool.quit:
+ case <-time.After(dialTimeout):
+ select {
+ case <-pool.quit:
+ case pool.timeout <- entry:
+ }
+ }
+ }()
+}
+
+// checkDialTimeout checks if the node is still in dialed state and if so, resets it
+// and adjusts connection statistics accordingly.
+func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
+ if entry.state != psDialed {
+ return
+ }
+ glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey())
+ entry.state = psNotConnected
+ if entry.knownSelected {
+ pool.knownSelected--
+ } else {
+ pool.newSelected--
+ }
+ entry.connectStats.add(0, 1)
+ entry.dialed.fails++
+ pool.setRetryDial(entry)
+}
+
+const (
+ psNotConnected = iota
+ psDialed
+ psConnected
+ psRegistered
+)
+
+// poolEntry represents a server node and stores its current state and statistics.
+type poolEntry struct {
+ peer *peer
+ id discover.NodeID
+ addr map[string]*poolEntryAddress
+ lastConnected, dialed *poolEntryAddress
+ addrSelect weightedRandomSelect
+
+ lastDiscovered mclock.AbsTime
+ known, knownSelected bool
+ connectStats, delayStats poolStats
+ responseStats, timeoutStats poolStats
+ state int
+ regTime mclock.AbsTime
+ queueIdx int
+ removed bool
+
+ delayedRetry bool
+ shortRetry int
+}
+
+func (e *poolEntry) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
+}
+
+func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
+ var entry struct {
+ ID discover.NodeID
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
+ }
+ if err := s.Decode(&entry); err != nil {
+ return err
+ }
+ addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
+ e.id = entry.ID
+ e.addr = make(map[string]*poolEntryAddress)
+ e.addr[addr.strKey()] = addr
+ e.addrSelect = *newWeightedRandomSelect()
+ e.addrSelect.update(addr)
+ e.lastConnected = addr
+ e.connectStats = entry.CStat
+ e.delayStats = entry.DStat
+ e.responseStats = entry.RStat
+ e.timeoutStats = entry.TStat
+ e.shortRetry = shortRetryCnt
+ e.known = true
+ return nil
+}
+
+// discoveredEntry implements wrsItem
+type discoveredEntry poolEntry
+
+// Weight calculates random selection weight for newly discovered entries
+func (e *discoveredEntry) Weight() int64 {
+ if e.state != psNotConnected || e.delayedRetry {
+ return 0
+ }
+ t := time.Duration(mclock.Now() - e.lastDiscovered)
+ if t <= discoverExpireStart {
+ return 1000000000
+ } else {
+ return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst)))
+ }
+}
+
+// knownEntry implements wrsItem
+type knownEntry poolEntry
+
+// Weight calculates random selection weight for known entries
+func (e *knownEntry) Weight() int64 {
+ if e.state != psNotConnected || !e.known || e.delayedRetry {
+ return 0
+ }
+ return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow))
+}
+
+// poolEntryAddress is a separate object because currently it is necessary to remember
+// multiple potential network addresses for a pool entry. This will be removed after
+// the final implementation of v5 discovery which will retrieve signed and serial
+// numbered advertisements, making it clear which IP/port is the latest one.
+type poolEntryAddress struct {
+ ip net.IP
+ port uint16
+ lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db
+ fails uint // connection failures since last successful connection (persistent)
+}
+
+func (a *poolEntryAddress) Weight() int64 {
+ t := time.Duration(mclock.Now() - a.lastSeen)
+ return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1
+}
+
+func (a *poolEntryAddress) strKey() string {
+ return a.ip.String() + ":" + strconv.Itoa(int(a.port))
+}
+
+// poolStats implement statistics for a certain quantity with a long term average
+// and a short term value which is adjusted exponentially with a factor of
+// pstatRecentAdjust with each update and also returned exponentially to the
+// average with the time constant pstatReturnToMeanTC
+type poolStats struct {
+ sum, weight, avg, recent float64
+ lastRecalc mclock.AbsTime
+}
+
+// init initializes stats with a long term sum/update count pair retrieved from the database
+func (s *poolStats) init(sum, weight float64) {
+ s.sum = sum
+ s.weight = weight
+ var avg float64
+ if weight > 0 {
+ avg = s.sum / weight
+ }
+ s.avg = avg
+ s.recent = avg
+ s.lastRecalc = mclock.Now()
+}
+
+// recalc recalculates recent value return-to-mean and long term average
+func (s *poolStats) recalc() {
+ now := mclock.Now()
+ s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC))
+ if s.sum == 0 {
+ s.avg = 0
+ } else {
+ if s.sum > s.weight*1e30 {
+ s.avg = 1e30
+ } else {
+ s.avg = s.sum / s.weight
+ }
+ }
+ s.lastRecalc = now
+}
+
+// add updates the stats with a new value
+func (s *poolStats) add(value, weight float64) {
+ s.weight += weight
+ s.sum += value * weight
+ s.recalc()
+}
+
+// recentAvg returns the short-term adjusted average
+func (s *poolStats) recentAvg() float64 {
+ s.recalc()
+ return s.recent
+}
+
+func (s *poolStats) EncodeRLP(w io.Writer) error {
+ return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)})
+}
+
+func (s *poolStats) DecodeRLP(st *rlp.Stream) error {
+ var stats struct {
+ SumUint, WeightUint uint64
+ }
+ if err := st.Decode(&stats); err != nil {
+ return err
+ }
+ s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint))
+ return nil
+}
+
+// poolEntryQueue keeps track of its least recently accessed entries and removes
+// them when the number of entries reaches the limit
+type poolEntryQueue struct {
+ queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value
+ newPtr, oldPtr, maxCnt int
+ removeFromPool func(*poolEntry)
+}
+
+// newPoolEntryQueue returns a new poolEntryQueue
+func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue {
+ return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool}
+}
+
+// fetchOldest returns and removes the least recently accessed entry
+func (q *poolEntryQueue) fetchOldest() *poolEntry {
+ if len(q.queue) == 0 {
+ return nil
+ }
+ for {
+ if e := q.queue[q.oldPtr]; e != nil {
+ delete(q.queue, q.oldPtr)
+ q.oldPtr++
+ return e
+ }
+ q.oldPtr++
+ }
+}
+
+// remove removes an entry from the queue
+func (q *poolEntryQueue) remove(entry *poolEntry) {
+ if q.queue[entry.queueIdx] == entry {
+ delete(q.queue, entry.queueIdx)
+ }
+}
+
+// setLatest adds or updates a recently accessed entry. It also checks if an old entry
+// needs to be removed and removes it from the parent pool too with a callback function.
+func (q *poolEntryQueue) setLatest(entry *poolEntry) {
+ if q.queue[entry.queueIdx] == entry {
+ delete(q.queue, entry.queueIdx)
+ } else {
+ if len(q.queue) == q.maxCnt {
+ e := q.fetchOldest()
+ q.remove(e)
+ q.removeFromPool(e)
+ }
+ }
+ entry.queueIdx = q.newPtr
+ q.queue[entry.queueIdx] = entry
+ q.newPtr++
+}
diff --git a/light/lightchain.go b/light/lightchain.go
index 1cea7a892..d397f5006 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -505,3 +505,14 @@ func (self *LightChain) SyncCht(ctx context.Context) bool {
}
return false
}
+
+// LockChain locks the chain mutex for reading so that multiple canonical hashes can be
+// retrieved while it is guaranteed that they belong to the same version of the chain
+func (self *LightChain) LockChain() {
+ self.chainmu.RLock()
+}
+
+// UnlockChain unlocks the chain mutex
+func (self *LightChain) UnlockChain() {
+ self.chainmu.RUnlock()
+}
diff --git a/light/odr.go b/light/odr.go
index 679569bf9..4f6ef6b9e 100644
--- a/light/odr.go
+++ b/light/odr.go
@@ -48,6 +48,7 @@ type OdrRequest interface {
// TrieID identifies a state or account storage trie
type TrieID struct {
BlockHash, Root common.Hash
+ BlockNumber uint64
AccKey []byte
}
@@ -55,9 +56,10 @@ type TrieID struct {
// header.
func StateTrieID(header *types.Header) *TrieID {
return &TrieID{
- BlockHash: header.Hash(),
- AccKey: nil,
- Root: header.Root,
+ BlockHash: header.Hash(),
+ BlockNumber: header.Number.Uint64(),
+ AccKey: nil,
+ Root: header.Root,
}
}
@@ -66,9 +68,10 @@ func StateTrieID(header *types.Header) *TrieID {
// checking Merkle proofs.
func StorageTrieID(state *TrieID, addr common.Address, root common.Hash) *TrieID {
return &TrieID{
- BlockHash: state.BlockHash,
- AccKey: crypto.Keccak256(addr[:]),
- Root: root,
+ BlockHash: state.BlockHash,
+ BlockNumber: state.BlockNumber,
+ AccKey: crypto.Keccak256(addr[:]),
+ Root: root,
}
}
diff --git a/light/odr_util.go b/light/odr_util.go
index 5c72f90e9..761711621 100644
--- a/light/odr_util.go
+++ b/light/odr_util.go
@@ -38,8 +38,9 @@ var (
ErrNoTrustedCht = errors.New("No trusted canonical hash trie")
ErrNoHeader = errors.New("Header not found")
- ChtFrequency = uint64(4096)
- trustedChtKey = []byte("TrustedCHT")
+ ChtFrequency = uint64(4096)
+ ChtConfirmations = uint64(2048)
+ trustedChtKey = []byte("TrustedCHT")
)
type ChtNode struct {
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index d1c48904e..1348c6774 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -126,8 +126,15 @@ type topicRegisterReq struct {
}
type topicSearchReq struct {
- topic Topic
- found chan<- string
+ topic Topic
+ found chan<- *Node
+ lookup chan<- bool
+ delay time.Duration
+}
+
+type topicSearchResult struct {
+ target lookupInfo
+ nodes []*Node
}
type timeoutEvent struct {
@@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
break
}
// Wait for the next reply.
- for _, n := range <-reply {
- if n != nil && !seen[n.ID] {
- seen[n.ID] = true
- result.push(n, bucketSize)
- if stopOnMatch && n.sha == target {
- return result.entries
+ select {
+ case nodes := <-reply:
+ for _, n := range nodes {
+ if n != nil && !seen[n.ID] {
+ seen[n.ID] = true
+ result.push(n, bucketSize)
+ if stopOnMatch && n.sha == target {
+ return result.entries
+ }
}
}
+ pendingQueries--
+ case <-time.After(respTimeout):
+ // forget all pending requests, start new ones
+ pendingQueries = 0
+ reply = make(chan []*Node, alpha)
}
- pendingQueries--
}
return result.entries
}
@@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
}
}
-func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) {
- select {
- case net.topicSearchReq <- topicSearchReq{topic, found}:
- case <-net.closed:
- return
- }
- select {
- case <-net.closed:
- case <-stop:
+func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) {
+ for {
select {
- case net.topicSearchReq <- topicSearchReq{topic, nil}:
case <-net.closed:
+ return
+ case delay, ok := <-setPeriod:
+ select {
+ case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}:
+ case <-net.closed:
+ return
+ }
+ if !ok {
+ return
+ }
}
}
}
@@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) {
// TODO: external address handling.
+type topicSearchInfo struct {
+ lookupChn chan<- bool
+ period time.Duration
+}
+
+const maxSearchCount = 5
+
func (net *Network) loop() {
var (
refreshTimer = time.NewTicker(autoRefreshInterval)
@@ -385,10 +408,12 @@ func (net *Network) loop() {
topicRegisterLookupTarget lookupInfo
topicRegisterLookupDone chan []*Node
topicRegisterLookupTick = time.NewTimer(0)
- topicSearchLookupTarget lookupInfo
searchReqWhenRefreshDone []topicSearchReq
+ searchInfo = make(map[Topic]topicSearchInfo)
+ activeSearchCount int
)
- topicSearchLookupDone := make(chan []*Node, 1)
+ topicSearchLookupDone := make(chan topicSearchResult, 100)
+ topicSearch := make(chan Topic, 100)
<-topicRegisterLookupTick.C
statsDump := time.NewTicker(10 * time.Second)
@@ -504,21 +529,52 @@ loop:
case req := <-net.topicSearchReq:
if refreshDone == nil {
debugLog("<-net.topicSearchReq")
- if req.found == nil {
- net.ticketStore.removeSearchTopic(req.topic)
+ info, ok := searchInfo[req.topic]
+ if ok {
+ if req.delay == time.Duration(0) {
+ delete(searchInfo, req.topic)
+ net.ticketStore.removeSearchTopic(req.topic)
+ } else {
+ info.period = req.delay
+ searchInfo[req.topic] = info
+ }
continue
}
- net.ticketStore.addSearchTopic(req.topic, req.found)
- if (topicSearchLookupTarget.target == common.Hash{}) {
- topicSearchLookupDone <- nil
+ if req.delay != time.Duration(0) {
+ var info topicSearchInfo
+ info.period = req.delay
+ info.lookupChn = req.lookup
+ searchInfo[req.topic] = info
+ net.ticketStore.addSearchTopic(req.topic, req.found)
+ topicSearch <- req.topic
}
} else {
searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
}
- case nodes := <-topicSearchLookupDone:
- debugLog("<-topicSearchLookupDone")
- net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte {
+ case topic := <-topicSearch:
+ if activeSearchCount < maxSearchCount {
+ activeSearchCount++
+ target := net.ticketStore.nextSearchLookup(topic)
+ go func() {
+ nodes := net.lookup(target.target, false)
+ topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes}
+ }()
+ }
+ period := searchInfo[topic].period
+ if period != time.Duration(0) {
+ go func() {
+ time.Sleep(period)
+ topicSearch <- topic
+ }()
+ }
+
+ case res := <-topicSearchLookupDone:
+ activeSearchCount--
+ if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
+ lookupChn <- net.ticketStore.radius[res.target.topic].converged
+ }
+ net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte {
net.ping(n, n.addr())
return n.pingEcho
}, func(n *Node, topic Topic) []byte {
@@ -531,11 +587,6 @@ loop:
return nil
}
})
- topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
- target := topicSearchLookupTarget.target
- if (target != common.Hash{}) {
- go func() { topicSearchLookupDone <- net.lookup(target, false) }()
- }
case <-statsDump.C:
debugLog("<-statsDump.C")
diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go
index 202504314..752fdc9b4 100644
--- a/p2p/discv5/ticket.go
+++ b/p2p/discv5/ticket.go
@@ -138,16 +138,12 @@ type ticketStore struct {
nextTicketReg mclock.AbsTime
searchTopicMap map[Topic]searchTopic
- searchTopicList []Topic
- searchTopicPtr int
nextTopicQueryCleanup mclock.AbsTime
queriesSent map[*Node]map[common.Hash]sentQuery
- radiusLookupCnt int
}
type searchTopic struct {
- foundChn chan<- string
- listIdx int
+ foundChn chan<- *Node
}
type sentQuery struct {
@@ -183,23 +179,15 @@ func (s *ticketStore) addTopic(t Topic, register bool) {
}
}
-func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) {
+func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
s.addTopic(t, false)
if s.searchTopicMap[t].foundChn == nil {
- s.searchTopicList = append(s.searchTopicList, t)
- s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1}
+ s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
}
}
func (s *ticketStore) removeSearchTopic(t Topic) {
if st := s.searchTopicMap[t]; st.foundChn != nil {
- lastIdx := len(s.searchTopicList) - 1
- lastTopic := s.searchTopicList[lastIdx]
- s.searchTopicList[st.listIdx] = lastTopic
- sl := s.searchTopicMap[lastTopic]
- sl.listIdx = st.listIdx
- s.searchTopicMap[lastTopic] = sl
- s.searchTopicList = s.searchTopicList[:lastIdx]
delete(s.searchTopicMap, t)
}
}
@@ -247,20 +235,13 @@ func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Durati
return lookupInfo{}, 40 * time.Second
}
-func (s *ticketStore) nextSearchLookup() lookupInfo {
- if len(s.searchTopicList) == 0 {
- return lookupInfo{}
- }
- if s.searchTopicPtr >= len(s.searchTopicList) {
- s.searchTopicPtr = 0
- }
- topic := s.searchTopicList[s.searchTopicPtr]
- s.searchTopicPtr++
- target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery)
+func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
+ tr := s.radius[topic]
+ target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
if target.radiusLookup {
- s.radiusLookupCnt++
+ tr.radiusLookupCnt++
} else {
- s.radiusLookupCnt = 0
+ tr.radiusLookupCnt = 0
}
return target
}
@@ -662,9 +643,9 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod
if ip.IsUnspecified() || ip.IsLoopback() {
ip = from.IP
}
- enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1
+ n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1
select {
- case chn <- enode:
+ case chn <- n:
default:
return false
}
@@ -677,6 +658,8 @@ type topicRadius struct {
topicHashPrefix uint64
radius, minRadius uint64
buckets []topicRadiusBucket
+ converged bool
+ radiusLookupCnt int
}
type topicRadiusEvent int
@@ -706,7 +689,7 @@ func (b *topicRadiusBucket) update(now mclock.AbsTime) {
b.lastTime = now
for target, tm := range b.lookupSent {
- if now-tm > mclock.AbsTime(pingTimeout) {
+ if now-tm > mclock.AbsTime(respTimeout) {
b.weights[trNoAdjust] += 1
delete(b.lookupSent, target)
}
@@ -906,6 +889,7 @@ func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
if radiusLookup == -1 {
// no more radius lookups needed at the moment, return a radius
+ r.converged = true
rad := maxBucket
if minRadBucket < rad {
rad = minRadBucket