// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . // Package les implements the Light Ethereum Subprotocol. package les import ( "io" "math" "math/rand" "net" "strconv" "sync" "time" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/rlp" ) const ( // After a connection has been ended or timed out, there is a waiting period // before it can be selected for connection again. // waiting period = base delay * (1 + random(1)) // base delay = shortRetryDelay for the first shortRetryCnt times after a // successful connection, after that longRetryDelay is applied shortRetryCnt = 5 shortRetryDelay = time.Second * 5 longRetryDelay = time.Minute * 10 // maxNewEntries is the maximum number of newly discovered (never connected) nodes. // If the limit is reached, the least recently discovered one is thrown out. maxNewEntries = 1000 // maxKnownEntries is the maximum number of known (already connected) nodes. // If the limit is reached, the least recently connected one is thrown out. // (not that unlike new entries, known entries are persistent) maxKnownEntries = 1000 // target for simultaneously connected servers targetServerCount = 5 // target for servers selected from the known table // (we leave room for trying new ones if there is any) targetKnownSelect = 3 // after dialTimeout, consider the server unavailable and adjust statistics dialTimeout = time.Second * 30 // targetConnTime is the minimum expected connection duration before a server // drops a client without any specific reason targetConnTime = time.Minute * 10 // new entry selection weight calculation based on most recent discovery time: // unity until discoverExpireStart, then exponential decay with discoverExpireConst discoverExpireStart = time.Minute * 20 discoverExpireConst = time.Minute * 20 // known entry selection weight is dropped by a factor of exp(-failDropLn) after // each unsuccessful connection (restored after a successful one) failDropLn = 0.1 // known node connection success and quality statistics have a long term average // and a short term value which is adjusted exponentially with a factor of // pstatRecentAdjust with each dial/connection and also returned exponentially // to the average with the time constant pstatReturnToMeanTC pstatRecentAdjust = 0.1 pstatReturnToMeanTC = time.Hour // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after // each unsuccessful connection (restored after a successful one) addrFailDropLn = math.Ln2 // responseScoreTC and delayScoreTC are exponential decay time constants for // calculating selection chances from response times and block delay times responseScoreTC = time.Millisecond * 100 delayScoreTC = time.Second * 5 timeoutPow = 10 // peerSelectMinWeight is added to calculated weights at request peer selection // to give poorly performing peers a little chance of coming back peerSelectMinWeight = 0.005 // initStatsWeight is used to initialize previously unknown peers with good // statistics to give a chance to prove themselves initStatsWeight = 1 ) // serverPool implements a pool for storing and selecting newly discovered and already // known light server nodes. It received discovered nodes, stores statistics about // known nodes and takes care of always having enough good quality servers connected. type serverPool struct { db ethdb.Database dbKey []byte server *p2p.Server quit chan struct{} wg *sync.WaitGroup connWg sync.WaitGroup discSetPeriod chan time.Duration discNodes chan *discv5.Node discLookups chan bool entries map[discover.NodeID]*poolEntry lock sync.Mutex timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust knownQueue, newQueue poolEntryQueue knownSelect, newSelect *weightedRandomSelect knownSelected, newSelected int fastDiscover bool } // newServerPool creates a new serverPool instance func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool { pool := &serverPool{ db: db, dbKey: append(dbPrefix, []byte(topic)...), server: server, quit: quit, wg: wg, entries: make(map[discover.NodeID]*poolEntry), timeout: make(chan *poolEntry, 1), adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), knownSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(), fastDiscover: true, } pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) wg.Add(1) pool.loadNodes() pool.checkDial() if pool.server.DiscV5 != nil { pool.discSetPeriod = make(chan time.Duration, 1) pool.discNodes = make(chan *discv5.Node, 100) pool.discLookups = make(chan bool, 100) go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) } go pool.eventLoop() return pool } // connect should be called upon any incoming connection. If the connection has been // dialed by the server pool recently, the appropriate pool entry is returned. // Otherwise, the connection should be rejected. // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { pool.lock.Lock() defer pool.lock.Unlock() entry := pool.entries[p.ID()] if entry == nil { return nil } glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state) if entry.state != psDialed { return nil } pool.connWg.Add(1) entry.peer = p entry.state = psConnected addr := &poolEntryAddress{ ip: ip, port: port, lastSeen: mclock.Now(), } entry.lastConnected = addr entry.addr = make(map[string]*poolEntryAddress) entry.addr[addr.strKey()] = addr entry.addrSelect = *newWeightedRandomSelect() entry.addrSelect.update(addr) return entry } // registered should be called after a successful handshake func (pool *serverPool) registered(entry *poolEntry) { glog.V(logger.Debug).Infof("registered %v", entry.id.String()) pool.lock.Lock() defer pool.lock.Unlock() entry.state = psRegistered entry.regTime = mclock.Now() if !entry.known { pool.newQueue.remove(entry) entry.known = true } pool.knownQueue.setLatest(entry) entry.shortRetry = shortRetryCnt } // disconnect should be called when ending a connection. Service quality statistics // can be updated optionally (not updated if no registration happened, in this case // only connection statistics are updated, just like in case of timeout) func (pool *serverPool) disconnect(entry *poolEntry) { glog.V(logger.Debug).Infof("disconnected %v", entry.id.String()) pool.lock.Lock() defer pool.lock.Unlock() if entry.state == psRegistered { connTime := mclock.Now() - entry.regTime connAdjust := float64(connTime) / float64(targetConnTime) if connAdjust > 1 { connAdjust = 1 } stopped := false select { case <-pool.quit: stopped = true default: } if stopped { entry.connectStats.add(1, connAdjust) } else { entry.connectStats.add(connAdjust, 1) } } entry.state = psNotConnected if entry.knownSelected { pool.knownSelected-- } else { pool.newSelected-- } pool.setRetryDial(entry) pool.connWg.Done() } const ( pseBlockDelay = iota pseResponseTime pseResponseTimeout ) // poolStatAdjust records are sent to adjust peer block delay/response time statistics type poolStatAdjust struct { adjustType int entry *poolEntry time time.Duration } // adjustBlockDelay adjusts the block announce delay statistics of a node func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) { pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time} } // adjustResponseTime adjusts the request response time statistics of a node func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) { if timeout { pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time} } else { pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time} } } type selectPeerItem struct { peer *peer weight int64 wait time.Duration } func (sp selectPeerItem) Weight() int64 { return sp.weight } // selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request // and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed // after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time. func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) { pool.lock.Lock() type selectPeer struct { peer *peer rstat, tstat float64 } var list []selectPeer sel := newWeightedRandomSelect() for _, entry := range pool.entries { if entry.state == psRegistered { if !entry.peer.fcServer.IsAssigned() { list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()}) } } } pool.lock.Unlock() for _, sp := range list { ok, wait := canSend(sp.peer) if ok { w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow))) sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait}) } } choice := sel.choose() if choice == nil { return nil, 0, false } peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait locked := false if wait < time.Millisecond*100 { if peer.fcServer.AssignRequest(reqID) { ok, w := canSend(peer) wait = time.Duration(w) if ok && wait < time.Millisecond*100 { locked = true } else { peer.fcServer.DeassignRequest(reqID) wait = time.Millisecond * 100 } } } else { wait = time.Millisecond * 100 } return peer, wait, locked } // selectPeer selects a suitable peer for a request, waiting until an assignment to // the request is guaranteed or the process is aborted. func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer { for { peer, wait, locked := pool.selectPeer(reqID, canSend) if locked { return peer } select { case <-abort: return nil case <-time.After(wait): } } } // eventLoop handles pool events and mutex locking for all internal functions func (pool *serverPool) eventLoop() { lookupCnt := 0 var convTime mclock.AbsTime pool.discSetPeriod <- time.Millisecond * 100 for { select { case entry := <-pool.timeout: pool.lock.Lock() if !entry.removed { pool.checkDialTimeout(entry) } pool.lock.Unlock() case entry := <-pool.enableRetry: pool.lock.Lock() if !entry.removed { entry.delayedRetry = false pool.updateCheckDial(entry) } pool.lock.Unlock() case adj := <-pool.adjustStats: pool.lock.Lock() switch adj.adjustType { case pseBlockDelay: adj.entry.delayStats.add(float64(adj.time), 1) case pseResponseTime: adj.entry.responseStats.add(float64(adj.time), 1) adj.entry.timeoutStats.add(0, 1) case pseResponseTimeout: adj.entry.timeoutStats.add(1, 1) } pool.lock.Unlock() case node := <-pool.discNodes: pool.lock.Lock() now := mclock.Now() id := discover.NodeID(node.ID) entry := pool.entries[id] if entry == nil { glog.V(logger.Debug).Infof("discovered %v", node.String()) entry = &poolEntry{ id: id, addr: make(map[string]*poolEntryAddress), addrSelect: *newWeightedRandomSelect(), shortRetry: shortRetryCnt, } pool.entries[id] = entry // initialize previously unknown peers with good statistics to give a chance to prove themselves entry.connectStats.add(1, initStatsWeight) entry.delayStats.add(0, initStatsWeight) entry.responseStats.add(0, initStatsWeight) entry.timeoutStats.add(0, initStatsWeight) } entry.lastDiscovered = now addr := &poolEntryAddress{ ip: node.IP, port: node.TCP, } if a, ok := entry.addr[addr.strKey()]; ok { addr = a } else { entry.addr[addr.strKey()] = addr } addr.lastSeen = now entry.addrSelect.update(addr) if !entry.known { pool.newQueue.setLatest(entry) } pool.updateCheckDial(entry) pool.lock.Unlock() case conv := <-pool.discLookups: if conv { if lookupCnt == 0 { convTime = mclock.Now() } lookupCnt++ if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) { pool.fastDiscover = false pool.discSetPeriod <- time.Minute } } case <-pool.quit: close(pool.discSetPeriod) pool.connWg.Wait() pool.saveNodes() pool.wg.Done() return } } } // loadNodes loads known nodes and their statistics from the database func (pool *serverPool) loadNodes() { enc, err := pool.db.Get(pool.dbKey) if err != nil { return } var list []*poolEntry err = rlp.DecodeBytes(enc, &list) if err != nil { glog.V(logger.Debug).Infof("node list decode error: %v", err) return } for _, e := range list { glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight) pool.entries[e.id] = e pool.knownQueue.setLatest(e) pool.knownSelect.update((*knownEntry)(e)) } } // saveNodes saves known nodes and their statistics into the database. Nodes are // ordered from least to most recently connected. func (pool *serverPool) saveNodes() { list := make([]*poolEntry, len(pool.knownQueue.queue)) for i, _ := range list { list[i] = pool.knownQueue.fetchOldest() } enc, err := rlp.EncodeToBytes(list) if err == nil { pool.db.Put(pool.dbKey, enc) } } // removeEntry removes a pool entry when the entry count limit is reached. // Note that it is called by the new/known queues from which the entry has already // been removed so removing it from the queues is not necessary. func (pool *serverPool) removeEntry(entry *poolEntry) { pool.newSelect.remove((*discoveredEntry)(entry)) pool.knownSelect.remove((*knownEntry)(entry)) entry.removed = true delete(pool.entries, entry.id) } // setRetryDial starts the timer which will enable dialing a certain node again func (pool *serverPool) setRetryDial(entry *poolEntry) { delay := longRetryDelay if entry.shortRetry > 0 { entry.shortRetry-- delay = shortRetryDelay } delay += time.Duration(rand.Int63n(int64(delay) + 1)) entry.delayedRetry = true go func() { select { case <-pool.quit: case <-time.After(delay): select { case <-pool.quit: case pool.enableRetry <- entry: } } }() } // updateCheckDial is called when an entry can potentially be dialed again. It updates // its selection weights and checks if new dials can/should be made. func (pool *serverPool) updateCheckDial(entry *poolEntry) { pool.newSelect.update((*discoveredEntry)(entry)) pool.knownSelect.update((*knownEntry)(entry)) pool.checkDial() } // checkDial checks if new dials can/should be made. It tries to select servers both // based on good statistics and recent discovery. func (pool *serverPool) checkDial() { fillWithKnownSelects := !pool.fastDiscover for pool.knownSelected < targetKnownSelect { entry := pool.knownSelect.choose() if entry == nil { fillWithKnownSelects = false break } pool.dial((*poolEntry)(entry.(*knownEntry)), true) } for pool.knownSelected+pool.newSelected < targetServerCount { entry := pool.newSelect.choose() if entry == nil { break } pool.dial((*poolEntry)(entry.(*discoveredEntry)), false) } if fillWithKnownSelects { // no more newly discovered nodes to select and since fast discover period // is over, we probably won't find more in the near future so select more // known entries if possible for pool.knownSelected < targetServerCount { entry := pool.knownSelect.choose() if entry == nil { break } pool.dial((*poolEntry)(entry.(*knownEntry)), true) } } } // dial initiates a new connection func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { if entry.state != psNotConnected { return } entry.state = psDialed entry.knownSelected = knownSelected if knownSelected { pool.knownSelected++ } else { pool.newSelected++ } addr := entry.addrSelect.choose().(*poolEntryAddress) glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected) entry.dialed = addr go func() { pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) select { case <-pool.quit: case <-time.After(dialTimeout): select { case <-pool.quit: case pool.timeout <- entry: } } }() } // checkDialTimeout checks if the node is still in dialed state and if so, resets it // and adjusts connection statistics accordingly. func (pool *serverPool) checkDialTimeout(entry *poolEntry) { if entry.state != psDialed { return } glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey()) entry.state = psNotConnected if entry.knownSelected { pool.knownSelected-- } else { pool.newSelected-- } entry.connectStats.add(0, 1) entry.dialed.fails++ pool.setRetryDial(entry) } const ( psNotConnected = iota psDialed psConnected psRegistered ) // poolEntry represents a server node and stores its current state and statistics. type poolEntry struct { peer *peer id discover.NodeID addr map[string]*poolEntryAddress lastConnected, dialed *poolEntryAddress addrSelect weightedRandomSelect lastDiscovered mclock.AbsTime known, knownSelected bool connectStats, delayStats poolStats responseStats, timeoutStats poolStats state int regTime mclock.AbsTime queueIdx int removed bool delayedRetry bool shortRetry int } func (e *poolEntry) EncodeRLP(w io.Writer) error { return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats}) } func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { var entry struct { ID discover.NodeID IP net.IP Port uint16 Fails uint CStat, DStat, RStat, TStat poolStats } if err := s.Decode(&entry); err != nil { return err } addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} e.id = entry.ID e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr e.addrSelect = *newWeightedRandomSelect() e.addrSelect.update(addr) e.lastConnected = addr e.connectStats = entry.CStat e.delayStats = entry.DStat e.responseStats = entry.RStat e.timeoutStats = entry.TStat e.shortRetry = shortRetryCnt e.known = true return nil } // discoveredEntry implements wrsItem type discoveredEntry poolEntry // Weight calculates random selection weight for newly discovered entries func (e *discoveredEntry) Weight() int64 { if e.state != psNotConnected || e.delayedRetry { return 0 } t := time.Duration(mclock.Now() - e.lastDiscovered) if t <= discoverExpireStart { return 1000000000 } else { return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst))) } } // knownEntry implements wrsItem type knownEntry poolEntry // Weight calculates random selection weight for known entries func (e *knownEntry) Weight() int64 { if e.state != psNotConnected || !e.known || e.delayedRetry { return 0 } return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow)) } // poolEntryAddress is a separate object because currently it is necessary to remember // multiple potential network addresses for a pool entry. This will be removed after // the final implementation of v5 discovery which will retrieve signed and serial // numbered advertisements, making it clear which IP/port is the latest one. type poolEntryAddress struct { ip net.IP port uint16 lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db fails uint // connection failures since last successful connection (persistent) } func (a *poolEntryAddress) Weight() int64 { t := time.Duration(mclock.Now() - a.lastSeen) return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1 } func (a *poolEntryAddress) strKey() string { return a.ip.String() + ":" + strconv.Itoa(int(a.port)) } // poolStats implement statistics for a certain quantity with a long term average // and a short term value which is adjusted exponentially with a factor of // pstatRecentAdjust with each update and also returned exponentially to the // average with the time constant pstatReturnToMeanTC type poolStats struct { sum, weight, avg, recent float64 lastRecalc mclock.AbsTime } // init initializes stats with a long term sum/update count pair retrieved from the database func (s *poolStats) init(sum, weight float64) { s.sum = sum s.weight = weight var avg float64 if weight > 0 { avg = s.sum / weight } s.avg = avg s.recent = avg s.lastRecalc = mclock.Now() } // recalc recalculates recent value return-to-mean and long term average func (s *poolStats) recalc() { now := mclock.Now() s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) if s.sum == 0 { s.avg = 0 } else { if s.sum > s.weight*1e30 { s.avg = 1e30 } else { s.avg = s.sum / s.weight } } s.lastRecalc = now } // add updates the stats with a new value func (s *poolStats) add(value, weight float64) { s.weight += weight s.sum += value * weight s.recalc() } // recentAvg returns the short-term adjusted average func (s *poolStats) recentAvg() float64 { s.recalc() return s.recent } func (s *poolStats) EncodeRLP(w io.Writer) error { return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)}) } func (s *poolStats) DecodeRLP(st *rlp.Stream) error { var stats struct { SumUint, WeightUint uint64 } if err := st.Decode(&stats); err != nil { return err } s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint)) return nil } // poolEntryQueue keeps track of its least recently accessed entries and removes // them when the number of entries reaches the limit type poolEntryQueue struct { queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value newPtr, oldPtr, maxCnt int removeFromPool func(*poolEntry) } // newPoolEntryQueue returns a new poolEntryQueue func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue { return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool} } // fetchOldest returns and removes the least recently accessed entry func (q *poolEntryQueue) fetchOldest() *poolEntry { if len(q.queue) == 0 { return nil } for { if e := q.queue[q.oldPtr]; e != nil { delete(q.queue, q.oldPtr) q.oldPtr++ return e } q.oldPtr++ } } // remove removes an entry from the queue func (q *poolEntryQueue) remove(entry *poolEntry) { if q.queue[entry.queueIdx] == entry { delete(q.queue, entry.queueIdx) } } // setLatest adds or updates a recently accessed entry. It also checks if an old entry // needs to be removed and removes it from the parent pool too with a callback function. func (q *poolEntryQueue) setLatest(entry *poolEntry) { if q.queue[entry.queueIdx] == entry { delete(q.queue, entry.queueIdx) } else { if len(q.queue) == q.maxCnt { e := q.fetchOldest() q.remove(e) q.removeFromPool(e) } } entry.queueIdx = q.newPtr q.queue[entry.queueIdx] = entry q.newPtr++ }