From 3e617f3cd6c303652147f45ec9c85a5bb7769348 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 17 Nov 2016 15:54:24 +0100 Subject: [PATCH] les: implement light server pool --- les/handler.go | 96 +++--- les/peer.go | 2 + les/randselect.go | 173 +++++++++++ les/randselect_test.go | 67 +++++ les/serverpool.go | 654 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 947 insertions(+), 45 deletions(-) create mode 100644 les/randselect.go create mode 100644 les/randselect_test.go create mode 100644 les/serverpool.go diff --git a/les/handler.go b/les/handler.go index 83d73666f..2e6952d2f 100644 --- a/les/handler.go +++ b/les/handler.go @@ -22,10 +22,12 @@ import ( "errors" "fmt" "math/big" + "net" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -101,10 +103,7 @@ type ProtocolManager struct { chainDb ethdb.Database odr *LesOdr server *LesServer - - topicDisc *discv5.Network - lesTopic discv5.Topic - p2pServer *p2p.Server + serverPool *serverPool downloader *downloader.Downloader fetcher *lightFetcher @@ -157,13 +156,46 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + var entry *poolEntry + if manager.serverPool != nil { + addr := p.RemoteAddr().(*net.TCPAddr) + entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port)) + if entry == nil { + return fmt.Errorf("unwanted connection") + } + } peer := manager.newPeer(int(version), networkId, p, rw) + peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - return manager.handle(peer) + start := mclock.Now() + err := manager.handle(peer) + if entry != nil { + connTime := time.Duration(mclock.Now() - start) + stopped := false + select { + case <-manager.quitSync: + stopped = true + default: + } + //fmt.Println("connTime", peer.id, connTime, stopped, err) + quality := float64(1) + setQuality := true + if connTime < time.Minute*10 { + quality = 0 + if stopped { + setQuality = false + } + } + manager.serverPool.disconnect(entry, quality, setQuality) + } + return err case <-manager.quitSync: + if entry != nil { + manager.serverPool.disconnect(entry, 0, false) + } return p2p.DiscQuitting } }, @@ -236,54 +268,24 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) findServers() { - if pm.p2pServer == nil || pm.topicDisc == nil { - return - } - glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic)) - enodes := make(chan string, 100) - stop := make(chan struct{}) - go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes) - go func() { - added := make(map[string]bool) - for { - select { - case enode := <-enodes: - if !added[enode] { - glog.V(logger.Info).Infoln("Found LES server:", enode) - added[enode] = true - if node, err := discover.ParseNode(enode); err == nil { - pm.p2pServer.AddPeer(node) - } - } - case <-stop: - return - } - } - }() - select { - case <-time.After(time.Second * 20): - case <-pm.quitSync: - } - close(stop) -} - func (pm *ProtocolManager) Start(srvr *p2p.Server) { - pm.p2pServer = srvr + var topicDisc *discv5.Network if srvr != nil { - pm.topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5 } - pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) + lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - go pm.findServers() + if srvr != nil { + pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + } go pm.syncer() } else { - if pm.topicDisc != nil { + if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic)) - pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic)) + glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic)) + topicDisc.RegisterTopic(lesTopic, pm.quitSync) + glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { @@ -373,6 +375,10 @@ func (pm *ProtocolManager) handle(p *peer) error { } pm.fetcher.notify(p, nil) + + if p.poolEntry != nil { + pm.serverPool.registered(p.poolEntry) + } } stop := make(chan struct{}) diff --git a/les/peer.go b/les/peer.go index 5d566d899..8eced439d 100644 --- a/les/peer.go +++ b/les/peer.go @@ -57,6 +57,8 @@ type peer struct { announceChn chan announceData + poolEntry *poolEntry + fcClient *flowcontrol.ClientNode // nil if the peer is server only fcServer *flowcontrol.ServerNode // nil if the peer is client only fcServerParams *flowcontrol.ServerParams diff --git a/les/randselect.go b/les/randselect.go new file mode 100644 index 000000000..1a9d0695b --- /dev/null +++ b/les/randselect.go @@ -0,0 +1,173 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "math/rand" +) + +// wrsItem interface should be implemented by any entries that are to be selected from +// a weightedRandomSelect set. Note that recalculating monotonously decreasing item +// weights on-demand (without constantly calling update) is allowed +type wrsItem interface { + Weight() int64 +} + +// weightedRandomSelect is capable of weighted random selection from a set of items +type weightedRandomSelect struct { + root *wrsNode + idx map[wrsItem]int +} + +// newWeightedRandomSelect returns a new weightedRandomSelect structure +func newWeightedRandomSelect() *weightedRandomSelect { + return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} +} + +// update updates an item's weight, adds it if it was non-existent or removes it if +// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. +func (w *weightedRandomSelect) update(item wrsItem) { + w.setWeight(item, item.Weight()) +} + +// remove removes an item from the set +func (w *weightedRandomSelect) remove(item wrsItem) { + w.setWeight(item, 0) +} + +// setWeight sets an item's weight to a specific value (removes it if zero) +func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { + idx, ok := w.idx[item] + if ok { + w.root.setWeight(idx, weight) + if weight == 0 { + delete(w.idx, item) + } + } else { + if weight != 0 { + if w.root.itemCnt == w.root.maxItems { + // add a new level + newRoot := &wrsNode{sumWeight: w.root.sumWeight, itemCnt: w.root.itemCnt, level: w.root.level + 1, maxItems: w.root.maxItems * wrsBranches} + newRoot.items[0] = w.root + newRoot.weights[0] = w.root.sumWeight + w.root = newRoot + } + w.idx[item] = w.root.insert(item, weight) + } + } +} + +// choose randomly selects an item from the set, with a chance proportional to its +// current weight. If the weight of the chosen element has been decreased since the +// last stored value, returns it with a newWeight/oldWeight chance, otherwise just +// updates its weight and selects another one +func (w *weightedRandomSelect) choose() wrsItem { + for { + if w.root.sumWeight == 0 { + return nil + } + val := rand.Int63n(w.root.sumWeight) + choice, lastWeight := w.root.choose(val) + weight := choice.Weight() + if weight != lastWeight { + w.setWeight(choice, weight) + } + if weight >= lastWeight || rand.Int63n(lastWeight) < weight { + return choice + } + } +} + +const wrsBranches = 8 // max number of branches in the wrsNode tree + +// wrsNode is a node of a tree structure that can store wrsItems or further wrsNodes. +type wrsNode struct { + items [wrsBranches]interface{} + weights [wrsBranches]int64 + sumWeight int64 + level, itemCnt, maxItems int +} + +// insert recursively inserts a new item to the tree and returns the item index +func (n *wrsNode) insert(item wrsItem, weight int64) int { + branch := 0 + for n.items[branch] != nil && (n.level == 0 || n.items[branch].(*wrsNode).itemCnt == n.items[branch].(*wrsNode).maxItems) { + branch++ + if branch == wrsBranches { + panic(nil) + } + } + n.itemCnt++ + n.sumWeight += weight + n.weights[branch] += weight + if n.level == 0 { + n.items[branch] = item + return branch + } else { + var subNode *wrsNode + if n.items[branch] == nil { + subNode = &wrsNode{maxItems: n.maxItems / wrsBranches, level: n.level - 1} + n.items[branch] = subNode + } else { + subNode = n.items[branch].(*wrsNode) + } + subIdx := subNode.insert(item, weight) + return subNode.maxItems*branch + subIdx + } +} + +// setWeight updates the weight of a certain item (which should exist) and returns +// the change of the last weight value stored in the tree +func (n *wrsNode) setWeight(idx int, weight int64) int64 { + if n.level == 0 { + oldWeight := n.weights[idx] + n.weights[idx] = weight + diff := weight - oldWeight + n.sumWeight += diff + if weight == 0 { + n.items[idx] = nil + n.itemCnt-- + } + return diff + } + branchItems := n.maxItems / wrsBranches + branch := idx / branchItems + diff := n.items[branch].(*wrsNode).setWeight(idx-branch*branchItems, weight) + n.weights[branch] += diff + n.sumWeight += diff + if weight == 0 { + n.itemCnt-- + } + return diff +} + +// choose recursively selects an item from the tree and returns it along with its weight +func (n *wrsNode) choose(val int64) (wrsItem, int64) { + for i, w := range n.weights { + if val < w { + if n.level == 0 { + return n.items[i].(wrsItem), n.weights[i] + } else { + return n.items[i].(*wrsNode).choose(val) + } + } else { + val -= w + } + } + panic(nil) +} diff --git a/les/randselect_test.go b/les/randselect_test.go new file mode 100644 index 000000000..f3c34305e --- /dev/null +++ b/les/randselect_test.go @@ -0,0 +1,67 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "math/rand" + "testing" +) + +type testWrsItem struct { + idx int + widx *int +} + +func (t *testWrsItem) Weight() int64 { + w := *t.widx + if w == -1 || w == t.idx { + return int64(t.idx + 1) + } + return 0 +} + +func TestWeightedRandomSelect(t *testing.T) { + testFn := func(cnt int) { + s := newWeightedRandomSelect() + w := -1 + list := make([]testWrsItem, cnt) + for i, _ := range list { + list[i] = testWrsItem{idx: i, widx: &w} + s.update(&list[i]) + } + w = rand.Intn(cnt) + c := s.choose() + if c == nil { + t.Errorf("expected item, got nil") + } else { + if c.(*testWrsItem).idx != w { + t.Errorf("expected another item") + } + } + w = -2 + if s.choose() != nil { + t.Errorf("expected nil, got item") + } + } + testFn(1) + testFn(10) + testFn(100) + testFn(1000) + testFn(10000) + testFn(100000) + testFn(1000000) +} diff --git a/les/serverpool.go b/les/serverpool.go new file mode 100644 index 000000000..95968eda2 --- /dev/null +++ b/les/serverpool.go @@ -0,0 +1,654 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "io" + "math" + "math/rand" + "net" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + // After a connection has been ended or timed out, there is a waiting period + // before it can be selected for connection again. + // waiting period = base delay * (1 + random(1)) + // base delay = shortRetryDelay for the first shortRetryCnt times after a + // successful connection, after that longRetryDelay is applied + shortRetryCnt = 5 + shortRetryDelay = time.Second * 5 + longRetryDelay = time.Minute * 10 + // maxNewEntries is the maximum number of newly discovered (never connected) nodes. + // If the limit is reached, the least recently discovered one is thrown out. + maxNewEntries = 1000 + // maxKnownEntries is the maximum number of known (already connected) nodes. + // If the limit is reached, the least recently connected one is thrown out. + // (not that unlike new entries, known entries are persistent) + maxKnownEntries = 1000 + // target for simultaneously connected servers + targetServerCount = 5 + // target for servers selected from the known table + // (we leave room for trying new ones if there is any) + targetKnownSelect = 3 + // after dialTimeout, consider the server unavailable and adjust statistics + dialTimeout = time.Second * 30 + // new entry selection weight calculation based on most recent discovery time: + // unity until discoverExpireStart, then exponential decay with discoverExpireConst + discoverExpireStart = time.Minute * 20 + discoverExpireConst = time.Minute * 20 + // known entry selection weight is dropped by a factor of exp(-failDropLn) after + // each unsuccessful connection (restored after a successful one) + failDropLn = 0.1 + // known node connection success and quality statistics have a long term average + // and a short term value which is adjusted exponentially with a factor of + // pstatRecentAdjust with each dial/connection and also returned exponentially + // to the average with the time constant pstatReturnToMeanTC + pstatRecentAdjust = 0.1 + pstatReturnToMeanTC = time.Hour + // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after + // each unsuccessful connection (restored after a successful one) + addrFailDropLn = math.Ln2 +) + +// serverPool implements a pool for storing and selecting newly discovered and already +// known light server nodes. It received discovered nodes, stores statistics about +// known nodes and takes care of always having enough good quality servers connected. +type serverPool struct { + db ethdb.Database + dbKey []byte + server *p2p.Server + quit chan struct{} + wg *sync.WaitGroup + connWg sync.WaitGroup + + discSetPeriod chan time.Duration + discNodes chan *discv5.Node + discLookups chan bool + + entries map[discover.NodeID]*poolEntry + lock sync.Mutex + timeout, enableRetry chan *poolEntry + + knownQueue, newQueue poolEntryQueue + knownSelect, newSelect *weightedRandomSelect + knownSelected, newSelected int + fastDiscover bool +} + +// newServerPool creates a new serverPool instance +func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool { + pool := &serverPool{ + db: db, + dbKey: append(dbPrefix, []byte(topic)...), + server: server, + quit: quit, + wg: wg, + entries: make(map[discover.NodeID]*poolEntry), + timeout: make(chan *poolEntry, 1), + enableRetry: make(chan *poolEntry, 1), + knownSelect: newWeightedRandomSelect(), + newSelect: newWeightedRandomSelect(), + fastDiscover: true, + } + pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) + pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) + wg.Add(1) + pool.loadNodes() + pool.checkDial() + + if pool.server.DiscV5 != nil { + pool.discSetPeriod = make(chan time.Duration, 1) + pool.discNodes = make(chan *discv5.Node, 100) + pool.discLookups = make(chan bool, 100) + go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + } + + go pool.eventLoop() + return pool +} + +// connect should be called upon any incoming connection. If the connection has been +// dialed by the server pool recently, the appropriate pool entry is returned. +// Otherwise, the connection should be rejected. +// Note that whenever a connection has been accepted and a pool entry has been returned, +// disconnect should also always be called. +func (pool *serverPool) connect(id discover.NodeID, ip net.IP, port uint16) *poolEntry { + pool.lock.Lock() + defer pool.lock.Unlock() + entry := pool.entries[id] + if entry == nil { + return nil + } + glog.V(logger.Debug).Infof("connecting to %v, state: %v", id.String(), entry.state) + if entry.state != psDialed { + return nil + } + pool.connWg.Add(1) + entry.state = psConnected + addr := &poolEntryAddress{ + ip: ip, + port: port, + lastSeen: mclock.Now(), + } + entry.lastConnected = addr + entry.addr = make(map[string]*poolEntryAddress) + entry.addr[addr.strKey()] = addr + entry.addrSelect = *newWeightedRandomSelect() + entry.addrSelect.update(addr) + return entry +} + +// registered should be called after a successful handshake +func (pool *serverPool) registered(entry *poolEntry) { + glog.V(logger.Debug).Infof("registered %v", entry.id.String()) + pool.lock.Lock() + defer pool.lock.Unlock() + + entry.state = psRegistered + if !entry.known { + pool.newQueue.remove(entry) + entry.known = true + } + pool.knownQueue.setLatest(entry) + entry.shortRetry = shortRetryCnt + entry.connectStats.add(1) +} + +// disconnect should be called when ending a connection. Service quality statistics +// can be updated optionally (not updated if no registration happened, in this case +// only connection statistics are updated, just like in case of timeout) +func (pool *serverPool) disconnect(entry *poolEntry, quality float64, setQuality bool) { + glog.V(logger.Debug).Infof("disconnected %v", entry.id.String()) + pool.lock.Lock() + defer pool.lock.Unlock() + + if entry.state != psRegistered { + setQuality = false + } + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + if setQuality { + glog.V(logger.Debug).Infof("update quality %v %v", quality, entry.id.String()) + entry.qualityStats.add(quality) + } else { + glog.V(logger.Debug).Infof("do not update quality") + } + pool.setRetryDial(entry) + pool.connWg.Done() +} + +// eventLoop handles pool events and mutex locking for all internal functions +func (pool *serverPool) eventLoop() { + lookupCnt := 0 + var convTime mclock.AbsTime + pool.discSetPeriod <- time.Millisecond * 100 + for { + select { + case entry := <-pool.timeout: + pool.lock.Lock() + if !entry.removed { + pool.checkDialTimeout(entry) + } + pool.lock.Unlock() + + case entry := <-pool.enableRetry: + pool.lock.Lock() + if !entry.removed { + entry.delayedRetry = false + pool.updateCheckDial(entry) + } + pool.lock.Unlock() + + case node := <-pool.discNodes: + pool.lock.Lock() + now := mclock.Now() + id := discover.NodeID(node.ID) + entry := pool.entries[id] + if entry == nil { + glog.V(logger.Debug).Infof("discovered %v", node.String()) + entry = &poolEntry{ + id: id, + addr: make(map[string]*poolEntryAddress), + addrSelect: *newWeightedRandomSelect(), + shortRetry: shortRetryCnt, + } + pool.entries[id] = entry + } + entry.lastDiscovered = now + addr := &poolEntryAddress{ + ip: node.IP, + port: node.TCP, + } + if a, ok := entry.addr[addr.strKey()]; ok { + addr = a + } else { + entry.addr[addr.strKey()] = addr + } + addr.lastSeen = now + entry.addrSelect.update(addr) + if !entry.known { + pool.newQueue.setLatest(entry) + } + pool.updateCheckDial(entry) + pool.lock.Unlock() + + case conv := <-pool.discLookups: + if conv { + if lookupCnt == 0 { + convTime = mclock.Now() + } + lookupCnt++ + if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) { + pool.fastDiscover = false + pool.discSetPeriod <- time.Minute + } + } + + case <-pool.quit: + close(pool.discSetPeriod) + pool.connWg.Wait() + pool.saveNodes() + pool.wg.Done() + return + + } + } +} + +// loadNodes loads known nodes and their statistics from the database +func (pool *serverPool) loadNodes() { + enc, err := pool.db.Get(pool.dbKey) + if err != nil { + return + } + var list []*poolEntry + err = rlp.DecodeBytes(enc, &list) + if err != nil { + glog.V(logger.Debug).Infof("node list decode error: %v", err) + return + } + glog.V(logger.Debug).Infof("loaded node list") + for _, e := range list { + glog.V(logger.Debug).Infof(" adding node %v fails: %v connStats sum: %v cnt: %v qualityStats sum: %v cnt: %v", e.id.String()+"@"+e.lastConnected.strKey(), e.lastConnected.fails, e.connectStats.sum, e.connectStats.cnt, e.qualityStats.sum, e.qualityStats.cnt) + pool.entries[e.id] = e + pool.knownQueue.setLatest(e) + pool.knownSelect.update((*knownEntry)(e)) + } +} + +// saveNodes saves known nodes and their statistics into the database. Nodes are +// ordered from least to most recently connected. +func (pool *serverPool) saveNodes() { + list := make([]*poolEntry, len(pool.knownQueue.queue)) + for i, _ := range list { + list[i] = pool.knownQueue.fetchOldest() + } + enc, err := rlp.EncodeToBytes(list) + if err == nil { + pool.db.Put(pool.dbKey, enc) + } +} + +// removeEntry removes a pool entry when the entry count limit is reached. +// Note that it is called by the new/known queues from which the entry has already +// been removed so removing it from the queues is not necessary. +func (pool *serverPool) removeEntry(entry *poolEntry) { + pool.newSelect.remove((*discoveredEntry)(entry)) + pool.knownSelect.remove((*knownEntry)(entry)) + entry.removed = true + delete(pool.entries, entry.id) +} + +// setRetryDial starts the timer which will enable dialing a certain node again +func (pool *serverPool) setRetryDial(entry *poolEntry) { + delay := longRetryDelay + if entry.shortRetry > 0 { + entry.shortRetry-- + delay = shortRetryDelay + } + delay += time.Duration(rand.Int63n(int64(delay) + 1)) + entry.delayedRetry = true + go func() { + select { + case <-pool.quit: + case <-time.After(delay): + select { + case <-pool.quit: + case pool.enableRetry <- entry: + } + } + }() +} + +// updateCheckDial is called when an entry can potentially be dialed again. It updates +// its selection weights and checks if new dials can/should be made. +func (pool *serverPool) updateCheckDial(entry *poolEntry) { + pool.newSelect.update((*discoveredEntry)(entry)) + pool.knownSelect.update((*knownEntry)(entry)) + pool.checkDial() +} + +// checkDial checks if new dials can/should be made. It tries to select servers both +// based on good statistics and recent discovery. +func (pool *serverPool) checkDial() { + fillWithKnownSelects := !pool.fastDiscover + for pool.knownSelected < targetKnownSelect { + entry := pool.knownSelect.choose() + if entry == nil { + fillWithKnownSelects = false + break + } + pool.dial((*poolEntry)(entry.(*knownEntry)), true) + } + for pool.knownSelected+pool.newSelected < targetServerCount { + entry := pool.newSelect.choose() + if entry == nil { + break + } + pool.dial((*poolEntry)(entry.(*discoveredEntry)), false) + } + if fillWithKnownSelects { + // no more newly discovered nodes to select and since fast discover period + // is over, we probably won't find more in the near future so select more + // known entries if possible + for pool.knownSelected < targetServerCount { + entry := pool.knownSelect.choose() + if entry == nil { + break + } + pool.dial((*poolEntry)(entry.(*knownEntry)), true) + } + } +} + +// dial initiates a new connection +func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { + if entry.state != psNotConnected { + return + } + entry.state = psDialed + entry.knownSelected = knownSelected + if knownSelected { + pool.knownSelected++ + } else { + pool.newSelected++ + } + addr := entry.addrSelect.choose().(*poolEntryAddress) + glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected) + entry.dialed = addr + go func() { + pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) + select { + case <-pool.quit: + case <-time.After(dialTimeout): + select { + case <-pool.quit: + case pool.timeout <- entry: + } + } + }() +} + +// checkDialTimeout checks if the node is still in dialed state and if so, resets it +// and adjusts connection statistics accordingly. +func (pool *serverPool) checkDialTimeout(entry *poolEntry) { + if entry.state != psDialed { + return + } + glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey()) + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + entry.connectStats.add(0) + entry.dialed.fails++ + pool.setRetryDial(entry) +} + +const ( + psNotConnected = iota + psDialed + psConnected + psRegistered +) + +// poolEntry represents a server node and stores its current state and statistics. +type poolEntry struct { + id discover.NodeID + addr map[string]*poolEntryAddress + lastConnected, dialed *poolEntryAddress + addrSelect weightedRandomSelect + + lastDiscovered mclock.AbsTime + known, knownSelected bool + connectStats, qualityStats poolStats + state int + queueIdx int + removed bool + + delayedRetry bool + shortRetry int +} + +func (e *poolEntry) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.qualityStats}) +} + +func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { + var entry struct { + ID discover.NodeID + IP net.IP + Port uint16 + Fails uint + CStat, QStat poolStats + } + if err := s.Decode(&entry); err != nil { + return err + } + addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} + e.id = entry.ID + e.addr = make(map[string]*poolEntryAddress) + e.addr[addr.strKey()] = addr + e.addrSelect = *newWeightedRandomSelect() + e.addrSelect.update(addr) + e.lastConnected = addr + e.connectStats = entry.CStat + e.qualityStats = entry.QStat + e.shortRetry = shortRetryCnt + e.known = true + return nil +} + +// discoveredEntry implements wrsItem +type discoveredEntry poolEntry + +// Weight calculates random selection weight for newly discovered entries +func (e *discoveredEntry) Weight() int64 { + if e.state != psNotConnected || e.delayedRetry { + return 0 + } + t := time.Duration(mclock.Now() - e.lastDiscovered) + if t <= discoverExpireStart { + return 1000000000 + } else { + return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst))) + } +} + +// knownEntry implements wrsItem +type knownEntry poolEntry + +// Weight calculates random selection weight for known entries +func (e *knownEntry) Weight() int64 { + if e.state != psNotConnected || !e.known || e.delayedRetry { + return 0 + } + return int64(1000000000 * e.connectStats.recentAvg() * (e.qualityStats.recentAvg() + 0.001) * math.Exp(-float64(e.lastConnected.fails)*failDropLn)) +} + +// poolEntryAddress is a separate object because currently it is necessary to remember +// multiple potential network addresses for a pool entry. This will be removed after +// the final implementation of v5 discovery which will retrieve signed and serial +// numbered advertisements, making it clear which IP/port is the latest one. +type poolEntryAddress struct { + ip net.IP + port uint16 + lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db + fails uint // connection failures since last successful connection (persistent) +} + +func (a *poolEntryAddress) Weight() int64 { + t := time.Duration(mclock.Now() - a.lastSeen) + return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1 +} + +func (a *poolEntryAddress) strKey() string { + return a.ip.String() + ":" + strconv.Itoa(int(a.port)) +} + +// poolStats implement statistics for a certain quantity with a long term average +// and a short term value which is adjusted exponentially with a factor of +// pstatRecentAdjust with each update and also returned exponentially to the +// average with the time constant pstatReturnToMeanTC +type poolStats struct { + sum, avg, recent float64 + cnt uint + lastRecalc mclock.AbsTime +} + +// init initializes stats with a long term sum/update count pair retrieved from the database +func (s *poolStats) init(sum float64, cnt uint) { + s.sum = sum + s.cnt = cnt + var avg float64 + if cnt > 0 { + avg = s.sum / float64(cnt) + } + s.avg = avg + s.recent = avg + s.lastRecalc = mclock.Now() +} + +// recalc recalculates recent value return-to-mean and long term average +func (s *poolStats) recalc() { + now := mclock.Now() + s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) + if s.cnt > 0 { + s.avg = s.sum / float64(s.cnt) + } + s.lastRecalc = now +} + +// add updates the stats with a new value +func (s *poolStats) add(val float64) { + s.cnt++ + s.sum += val + s.recalc() +} + +// recentAvg returns the short-term adjusted average +func (s *poolStats) recentAvg() float64 { + s.recalc() + return s.recent +} + +func (s *poolStats) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), s.cnt}) +} + +func (s *poolStats) DecodeRLP(st *rlp.Stream) error { + var stats struct { + SumUint uint64 + Cnt uint + } + if err := st.Decode(&stats); err != nil { + return err + } + s.init(math.Float64frombits(stats.SumUint), stats.Cnt) + return nil +} + +// poolEntryQueue keeps track of its least recently accessed entries and removes +// them when the number of entries reaches the limit +type poolEntryQueue struct { + queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value + newPtr, oldPtr, maxCnt int + removeFromPool func(*poolEntry) +} + +// newPoolEntryQueue returns a new poolEntryQueue +func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue { + return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool} +} + +// fetchOldest returns and removes the least recently accessed entry +func (q *poolEntryQueue) fetchOldest() *poolEntry { + if len(q.queue) == 0 { + return nil + } + for { + if e := q.queue[q.oldPtr]; e != nil { + delete(q.queue, q.oldPtr) + q.oldPtr++ + return e + } + q.oldPtr++ + } +} + +// remove removes an entry from the queue +func (q *poolEntryQueue) remove(entry *poolEntry) { + if q.queue[entry.queueIdx] == entry { + delete(q.queue, entry.queueIdx) + } +} + +// setLatest adds or updates a recently accessed entry. It also checks if an old entry +// needs to be removed and removes it from the parent pool too with a callback function. +func (q *poolEntryQueue) setLatest(entry *poolEntry) { + if q.queue[entry.queueIdx] == entry { + delete(q.queue, entry.queueIdx) + } else { + if len(q.queue) == q.maxCnt { + e := q.fetchOldest() + q.remove(e) + q.removeFromPool(e) + } + } + entry.queueIdx = q.newPtr + q.queue[entry.queueIdx] = entry + q.newPtr++ +}