go-ethereum/les/vflux/client/serverpool.go

607 lines
23 KiB
Go
Raw Normal View History

// Copyright 2020 The go-ethereum Authors
2016-11-17 14:54:24 +00:00
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
2016-11-17 14:54:24 +00:00
import (
"errors"
2016-11-17 14:54:24 +00:00
"math/rand"
"reflect"
2016-11-17 14:54:24 +00:00
"sync"
"sync/atomic"
2016-11-17 14:54:24 +00:00
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
2018-09-24 22:59:00 +00:00
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
2016-11-17 14:54:24 +00:00
"github.com/ethereum/go-ethereum/rlp"
)
const (
minTimeout = time.Millisecond * 500 // minimum request timeout suggested by the server pool
timeoutRefresh = time.Second * 5 // recalculate timeout if older than this
dialCost = 10000 // cost of a TCP dial (used for known node selection weight calculation)
dialWaitStep = 1.5 // exponential multiplier of redial wait time when no value was provided by the server
queryCost = 500 // cost of a UDP pre-negotiation query
queryWaitStep = 1.02 // exponential multiplier of redial wait time when no value was provided by the server
waitThreshold = time.Hour * 2000 // drop node if waiting time is over the threshold
nodeWeightMul = 1000000 // multiplier constant for node weight calculation
nodeWeightThreshold = 100 // minimum weight for keeping a node in the known (valuable) set
minRedialWait = 10 // minimum redial wait time in seconds
preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries
warnQueryFails = 20 // number of consecutive UDP query failures before we print a warning
maxQueryFails = 100 // number of consecutive UDP query failures when then chance of skipping a query reaches 50%
2016-11-17 14:54:24 +00:00
)
// ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered
// nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
type ServerPool struct {
clock mclock.Clock
unixTime func() int64
db ethdb.KeyValueStore
ns *nodestate.NodeStateMachine
vt *ValueTracker
mixer *enode.FairMix
mixSources []enode.Iterator
dialIterator enode.Iterator
validSchemes enr.IdentityScheme
trustedURLs []string
fillSet *FillSet
started, queryFails uint32
timeoutLock sync.RWMutex
timeout time.Duration
timeWeights ResponseTimeWeights
timeoutRefreshed mclock.AbsTime
suggestedTimeoutGauge, totalValueGauge metrics.Gauge
sessionValueMeter metrics.Meter
}
// nodeHistory keeps track of dial costs which determine node weight together with the
// service value calculated by ValueTracker.
type nodeHistory struct {
dialCost utils.ExpiredValue
redialWaitStart, redialWaitEnd int64 // unix time (seconds)
}
type nodeHistoryEnc struct {
DialCost utils.ExpiredValue
RedialWaitStart, RedialWaitEnd uint64
}
// queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs.
// It returns 1 if the remote node has confirmed that connection is possible, 0 if not
// possible and -1 if no response arrived (timeout).
type QueryFunc func(*enode.Node) int
var (
clientSetup = &nodestate.Setup{Version: 2}
sfHasValue = clientSetup.NewPersistentFlag("hasValue")
sfQuery = clientSetup.NewFlag("query")
sfCanDial = clientSetup.NewFlag("canDial")
sfDialing = clientSetup.NewFlag("dialed")
sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout")
sfConnected = clientSetup.NewFlag("connected")
sfRedialWait = clientSetup.NewFlag("redialWait")
sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect")
sfDialProcess = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait)
sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
func(field interface{}) ([]byte, error) {
if n, ok := field.(nodeHistory); ok {
ne := nodeHistoryEnc{
DialCost: n.dialCost,
RedialWaitStart: uint64(n.redialWaitStart),
RedialWaitEnd: uint64(n.redialWaitEnd),
}
enc, err := rlp.EncodeToBytes(&ne)
return enc, err
}
return nil, errors.New("invalid field type")
},
func(enc []byte) (interface{}, error) {
var ne nodeHistoryEnc
err := rlp.DecodeBytes(enc, &ne)
n := nodeHistory{
dialCost: ne.DialCost,
redialWaitStart: int64(ne.RedialWaitStart),
redialWaitEnd: int64(ne.RedialWaitEnd),
}
return n, err
},
)
sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
func(field interface{}) ([]byte, error) {
if enr, ok := field.(*enr.Record); ok {
enc, err := rlp.EncodeToBytes(enr)
return enc, err
}
return nil, errors.New("invalid field type")
},
func(enc []byte) (interface{}, error) {
var enr enr.Record
if err := rlp.DecodeBytes(enc, &enr); err != nil {
return nil, err
}
return &enr, nil
},
)
)
// NewServerPool creates a new server pool
func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query QueryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
s := &ServerPool{
db: db,
clock: clock,
unixTime: func() int64 { return time.Now().Unix() },
validSchemes: enode.ValidSchemes,
trustedURLs: trustedURLs,
vt: NewValueTracker(db, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, clientSetup),
}
s.recalTimeout()
s.mixer = enode.NewFairMix(mixTimeout)
knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight)
alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil)
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
s.dialIterator = s.mixer
if query != nil {
s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
}
s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
// dial timeout, no connection
s.setRedialWait(n, dialCost, dialWaitStep)
2020-09-14 12:01:18 +00:00
s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
}
})
2016-11-17 14:54:24 +00:00
return s, &serverPoolIterator{
dialIterator: s.dialIterator,
nextFn: func(node *enode.Node) {
s.ns.Operation(func() {
s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
})
},
nodeFn: s.DialNode,
}
}
type serverPoolIterator struct {
dialIterator enode.Iterator
nextFn func(*enode.Node)
nodeFn func(*enode.Node) *enode.Node
}
// Next implements enode.Iterator
func (s *serverPoolIterator) Next() bool {
if s.dialIterator.Next() {
s.nextFn(s.dialIterator.Node())
return true
}
return false
}
// Node implements enode.Iterator
func (s *serverPoolIterator) Node() *enode.Node {
return s.nodeFn(s.dialIterator.Node())
}
// Close implements enode.Iterator
func (s *serverPoolIterator) Close() {
s.dialIterator.Close()
}
// AddMetrics adds metrics to the server pool. Should be called before Start().
func (s *ServerPool) AddMetrics(
suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge metrics.Gauge,
sessionValueMeter, serverDialedMeter metrics.Meter) {
s.suggestedTimeoutGauge = suggestedTimeoutGauge
s.totalValueGauge = totalValueGauge
s.sessionValueMeter = sessionValueMeter
if serverSelectableGauge != nil {
s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge)
}
if serverDialedMeter != nil {
s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
}
if serverConnectedGauge != nil {
s.ns.AddLogMetrics(sfConnected, nodestate.Flags{}, "connected", nil, nil, serverConnectedGauge)
}
}
// AddSource adds a node discovery source to the server pool (should be called before start)
func (s *ServerPool) AddSource(source enode.Iterator) {
if source != nil {
s.mixSources = append(s.mixSources, source)
}
}
// addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
// Nodes that are filtered out and does not appear on the output iterator are put back
// into redialWait state.
func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator {
s.fillSet = NewFillSet(s.ns, input, sfQuery)
s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) {
if !newState.Equals(sfQuery) {
if newState.HasAll(sfQuery) {
// remove query flag if the node is already somewhere in the dial process
s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
}
return
}
fails := atomic.LoadUint32(&s.queryFails)
failMax := fails
if failMax > maxQueryFails {
failMax = maxQueryFails
}
if rand.Intn(maxQueryFails*2) < int(failMax) {
// skip pre-negotiation with increasing chance, max 50%
// this ensures that the client can operate even if UDP is not working at all
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
// set canDial before resetting queried so that FillSet will not read more
// candidates unnecessarily
s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
return
}
go func() {
q := query(n)
if q == -1 {
atomic.AddUint32(&s.queryFails, 1)
fails++
if fails%warnQueryFails == 0 {
// warn if a large number of consecutive queries have failed
log.Warn("UDP connection queries failed", "count", fails)
}
} else {
atomic.StoreUint32(&s.queryFails, 0)
2016-11-17 14:54:24 +00:00
}
s.ns.Operation(func() {
// we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
if q == 1 {
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
} else {
s.setRedialWait(n, queryCost, queryWaitStep)
}
s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0)
})
}()
})
return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
if waiting {
s.fillSet.SetTarget(preNegLimit)
} else {
s.fillSet.SetTarget(0)
2016-11-17 14:54:24 +00:00
}
})
2016-11-17 14:54:24 +00:00
}
// start starts the server pool. Note that NodeStateMachine should be started first.
func (s *ServerPool) Start() {
s.ns.Start()
for _, iter := range s.mixSources {
// add sources to mixer at startup because the mixer instantly tries to read them
// which should only happen after NodeStateMachine has been started
s.mixer.AddSource(iter)
2016-11-17 14:54:24 +00:00
}
for _, url := range s.trustedURLs {
if node, err := enode.Parse(s.validSchemes, url); err == nil {
s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
} else {
log.Error("Invalid trusted server URL", "url", url, "error", err)
2016-11-17 14:54:24 +00:00
}
}
unixTime := s.unixTime()
2020-09-14 12:01:18 +00:00
s.ns.Operation(func() {
s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
s.calculateWeight(node)
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
wait := n.redialWaitEnd - unixTime
lastWait := n.redialWaitEnd - n.redialWaitStart
if wait > lastWait {
// if the time until expiration is larger than the last suggested
// waiting time then the system clock was probably adjusted
wait = lastWait
}
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
2016-11-17 14:54:24 +00:00
}
2020-09-14 12:01:18 +00:00
})
all: new p2p node representation (#17643) Package p2p/enode provides a generalized representation of p2p nodes which can contain arbitrary information in key/value pairs. It is also the new home for the node database. The "v4" identity scheme is also moved here from p2p/enr to remove the dependency on Ethereum crypto from that package. Record signature handling is changed significantly. The identity scheme registry is removed and acceptable schemes must be passed to any method that needs identity. This means records must now be validated explicitly after decoding. The enode API is designed to make signature handling easy and safe: most APIs around the codebase work with enode.Node, which is a wrapper around a valid record. Going from enr.Record to enode.Node requires a valid signature. * p2p/discover: port to p2p/enode This ports the discovery code to the new node representation in p2p/enode. The wire protocol is unchanged, this can be considered a refactoring change. The Kademlia table can now deal with nodes using an arbitrary identity scheme. This requires a few incompatible API changes: - Table.Lookup is not available anymore. It used to take a public key as argument because v4 protocol requires one. Its replacement is LookupRandom. - Table.Resolve takes *enode.Node instead of NodeID. This is also for v4 protocol compatibility because nodes cannot be looked up by ID alone. - Types Node and NodeID are gone. Further commits in the series will be fixes all over the the codebase to deal with those removals. * p2p: port to p2p/enode and discovery changes This adapts package p2p to the changes in p2p/discover. All uses of discover.Node and discover.NodeID are replaced by their equivalents from p2p/enode. New API is added to retrieve the enode.Node instance of a peer. The behavior of Server.Self with discovery disabled is improved. It now tries much harder to report a working IP address, falling back to 127.0.0.1 if no suitable address can be determined through other means. These changes were needed for tests of other packages later in the series. * p2p/simulations, p2p/testing: port to p2p/enode No surprises here, mostly replacements of discover.Node, discover.NodeID with their new equivalents. The 'interesting' API changes are: - testing.ProtocolSession tracks complete nodes, not just their IDs. - adapters.NodeConfig has a new method to create a complete node. These changes were needed to make swarm tests work. Note that the NodeID change makes the code incompatible with old simulation snapshots. * whisper/whisperv5, whisper/whisperv6: port to p2p/enode This port was easy because whisper uses []byte for node IDs and URL strings in the API. * eth: port to p2p/enode Again, easy to port because eth uses strings for node IDs and doesn't care about node information in any way. * les: port to p2p/enode Apart from replacing discover.NodeID with enode.ID, most changes are in the server pool code. It now deals with complete nodes instead of (Pubkey, IP, Port) triples. The database format is unchanged for now, but we should probably change it to use the node database later. * node: port to p2p/enode This change simply replaces discover.Node and discover.NodeID with their new equivalents. * swarm/network: port to p2p/enode Swarm has its own node address representation, BzzAddr, containing both an overlay address (the hash of a secp256k1 public key) and an underlay address (enode:// URL). There are no changes to the BzzAddr format in this commit, but certain operations such as creating a BzzAddr from a node ID are now impossible because node IDs aren't public keys anymore. Most swarm-related changes in the series remove uses of NewAddrFromNodeID, replacing it with NewAddr which takes a complete node as argument. ToOverlayAddr is removed because we can just use the node ID directly.
2018-09-24 22:59:00 +00:00
})
atomic.StoreUint32(&s.started, 1)
2016-11-17 14:54:24 +00:00
}
// stop stops the server pool
func (s *ServerPool) Stop() {
if s.fillSet != nil {
s.fillSet.Close()
2016-11-17 14:54:24 +00:00
}
2020-09-14 12:01:18 +00:00
s.ns.Operation(func() {
s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
// recalculate weight of connected nodes in order to update hasValue flag if necessary
s.calculateWeight(n)
})
})
s.ns.Stop()
s.vt.Stop()
}
// RegisterNode implements serverPeerSubscriber
func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
if atomic.LoadUint32(&s.started) == 0 {
return nil, errors.New("server pool not started yet")
}
nvt := s.vt.Register(node.ID())
s.ns.Operation(func() {
s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
if node.IP().IsLoopback() {
s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
}
})
return nvt, nil
}
// UnregisterNode implements serverPeerSubscriber
func (s *ServerPool) UnregisterNode(node *enode.Node) {
2020-09-14 12:01:18 +00:00
s.ns.Operation(func() {
s.setRedialWait(node, dialCost, dialWaitStep)
s.ns.SetStateSub(node, nodestate.Flags{}, sfConnected, 0)
s.ns.SetFieldSub(node, sfiConnectedStats, nil)
2020-09-14 12:01:18 +00:00
})
s.vt.Unregister(node.ID())
}
// recalTimeout calculates the current recommended timeout. This value is used by
// the client as a "soft timeout" value. It also affects the service value calculation
// of individual nodes.
func (s *ServerPool) recalTimeout() {
// Use cached result if possible, avoid recalculating too frequently.
s.timeoutLock.RLock()
refreshed := s.timeoutRefreshed
s.timeoutLock.RUnlock()
now := s.clock.Now()
if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
return
2016-11-17 14:54:24 +00:00
}
// Cached result is stale, recalculate a new one.
rts := s.vt.RtStats()
2016-11-17 14:54:24 +00:00
// Add a fake statistic here. It is an easy way to initialize with some
// conservative values when the database is new. As soon as we have a
// considerable amount of real stats this small value won't matter.
rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
2016-11-17 14:54:24 +00:00
// Use either 10% failure rate timeout or twice the median response time
// as the recommended timeout.
timeout := minTimeout
if t := rts.Timeout(0.1); t > timeout {
timeout = t
2016-11-17 14:54:24 +00:00
}
if t := rts.Timeout(0.5) * 2; t > timeout {
timeout = t
2016-11-17 14:54:24 +00:00
}
s.timeoutLock.Lock()
if s.timeout != timeout {
s.timeout = timeout
s.timeWeights = TimeoutWeights(s.timeout)
2016-11-17 14:54:24 +00:00
if s.suggestedTimeoutGauge != nil {
s.suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
}
if s.totalValueGauge != nil {
s.totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
}
2016-11-17 14:54:24 +00:00
}
s.timeoutRefreshed = now
s.timeoutLock.Unlock()
2016-11-17 14:54:24 +00:00
}
// GetTimeout returns the recommended request timeout.
func (s *ServerPool) GetTimeout() time.Duration {
s.recalTimeout()
s.timeoutLock.RLock()
defer s.timeoutLock.RUnlock()
return s.timeout
2016-11-17 14:54:24 +00:00
}
// getTimeoutAndWeight returns the recommended request timeout as well as the
// response time weight which is necessary to calculate service value.
func (s *ServerPool) getTimeoutAndWeight() (time.Duration, ResponseTimeWeights) {
s.recalTimeout()
s.timeoutLock.RLock()
defer s.timeoutLock.RUnlock()
return s.timeout, s.timeWeights
2016-11-17 14:54:24 +00:00
}
// addDialCost adds the given amount of dial cost to the node history and returns the current
// amount of total dial cost
func (s *ServerPool) addDialCost(n *nodeHistory, amount int64) uint64 {
logOffset := s.vt.StatsExpirer().LogOffset(s.clock.Now())
if amount > 0 {
n.dialCost.Add(amount, logOffset)
2016-11-17 14:54:24 +00:00
}
totalDialCost := n.dialCost.Value(logOffset)
if totalDialCost < dialCost {
totalDialCost = dialCost
2016-11-17 14:54:24 +00:00
}
return totalDialCost
2016-11-17 14:54:24 +00:00
}
// serviceValue returns the service value accumulated in this session and in total
func (s *ServerPool) serviceValue(node *enode.Node) (sessionValue, totalValue float64) {
nvt := s.vt.GetNode(node.ID())
if nvt == nil {
return 0, 0
2016-11-17 14:54:24 +00:00
}
currentStats := nvt.RtStats()
_, timeWeights := s.getTimeoutAndWeight()
expFactor := s.vt.StatsExpFactor()
2016-11-17 14:54:24 +00:00
totalValue = currentStats.Value(timeWeights, expFactor)
if connStats, ok := s.ns.GetField(node, sfiConnectedStats).(ResponseTimeStats); ok {
diff := currentStats
diff.SubStats(&connStats)
sessionValue = diff.Value(timeWeights, expFactor)
if s.sessionValueMeter != nil {
s.sessionValueMeter.Mark(int64(sessionValue))
}
2016-11-17 14:54:24 +00:00
}
return
2016-11-17 14:54:24 +00:00
}
// updateWeight calculates the node weight and updates the nodeWeight field and the
// hasValue flag. It also saves the node state if necessary.
2020-09-14 12:01:18 +00:00
// Note: this function should run inside a NodeStateMachine operation
func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
if weight >= nodeWeightThreshold {
2020-09-14 12:01:18 +00:00
s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
s.ns.SetFieldSub(node, sfiNodeWeight, weight)
2016-11-17 14:54:24 +00:00
} else {
2020-09-14 12:01:18 +00:00
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
s.ns.SetFieldSub(node, sfiLocalAddress, nil)
}
s.ns.Persist(node) // saved if node history or hasValue changed
}
// setRedialWait calculates and sets the redialWait timeout based on the service value
// and dial cost accumulated during the last session/attempt and in total.
// The waiting time is raised exponentially if no service value has been received in order
// to prevent dialing an unresponsive node frequently for a very long time just because it
// was useful in the past. It can still be occasionally dialed though and once it provides
// a significant amount of service value again its waiting time is quickly reduced or reset
// to the minimum.
// Note: node weight is also recalculated and updated by this function.
2020-09-14 12:01:18 +00:00
// Note 2: this function should run inside a NodeStateMachine operation
func (s *ServerPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
sessionValue, totalValue := s.serviceValue(node)
totalDialCost := s.addDialCost(&n, addDialCost)
// if the current dial session has yielded at least the average value/dial cost ratio
// then the waiting time should be reset to the minimum. If the session value
// is below average but still positive then timeout is limited to the ratio of
// average / current service value multiplied by the minimum timeout. If the attempt
// was unsuccessful then timeout is raised exponentially without limitation.
// Note: dialCost is used in the formula below even if dial was not attempted at all
// because the pre-negotiation query did not return a positive result. In this case
// the ratio has no meaning anyway and waitFactor is always raised, though in smaller
// steps because queries are cheaper and therefore we can allow more failed attempts.
unixTime := s.unixTime()
plannedTimeout := float64(n.redialWaitEnd - n.redialWaitStart) // last planned redialWait timeout
var actualWait float64 // actual waiting time elapsed
if unixTime > n.redialWaitEnd {
// the planned timeout has elapsed
actualWait = plannedTimeout
} else {
// if the node was redialed earlier then we do not raise the planned timeout
// exponentially because that could lead to the timeout rising very high in
// a short amount of time
// Note that in case of an early redial actualWait also includes the dial
// timeout or connection time of the last attempt but it still serves its
// purpose of preventing the timeout rising quicker than linearly as a function
// of total time elapsed without a successful connection.
actualWait = float64(unixTime - n.redialWaitStart)
}
// raise timeout exponentially if the last planned timeout has elapsed
// (use at least the last planned timeout otherwise)
nextTimeout := actualWait * waitStep
if plannedTimeout > nextTimeout {
nextTimeout = plannedTimeout
}
// we reduce the waiting time if the server has provided service value during the
// connection (but never under the minimum)
a := totalValue * dialCost * float64(minRedialWait)
b := float64(totalDialCost) * sessionValue
if a < b*nextTimeout {
nextTimeout = a / b
}
if nextTimeout < minRedialWait {
nextTimeout = minRedialWait
}
wait := time.Duration(float64(time.Second) * nextTimeout)
if wait < waitThreshold {
n.redialWaitStart = unixTime
n.redialWaitEnd = unixTime + int64(nextTimeout)
2020-09-14 12:01:18 +00:00
s.ns.SetFieldSub(node, sfiNodeHistory, n)
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
s.updateWeight(node, totalValue, totalDialCost)
} else {
// discard known node statistics if waiting time is very long because the node
// hasn't been responsive for a very long time
2020-09-14 12:01:18 +00:00
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
}
}
// calculateWeight calculates and sets the node weight without altering the node history.
// This function should be called during startup and shutdown only, otherwise setRedialWait
// will keep the weights updated as the underlying statistics are adjusted.
2020-09-14 12:01:18 +00:00
// Note: this function should run inside a NodeStateMachine operation
func (s *ServerPool) calculateWeight(node *enode.Node) {
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
_, totalValue := s.serviceValue(node)
totalDialCost := s.addDialCost(&n, 0)
s.updateWeight(node, totalValue, totalDialCost)
2016-11-17 14:54:24 +00:00
}
// API returns the vflux client API
func (s *ServerPool) API() *PrivateClientAPI {
return NewPrivateClientAPI(s.vt)
}
type dummyIdentity enode.ID
func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
// DialNode replaces the given enode with a locally generated one containing the ENR
// stored in the sfiLocalAddress field if present. This workaround ensures that nodes
// on the local network can be dialed at the local address if a connection has been
// successfully established previously.
// Note that NodeStateMachine always remembers the enode with the latest version of
// the remote signed ENR. ENR filtering should be performed on that version while
// dialNode should be used for dialing the node over TCP or UDP.
func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
n, _ := enode.New(dummyIdentity(n.ID()), enr)
return n
}
return n
}
// Persist immediately stores the state of a node in the node database
func (s *ServerPool) Persist(n *enode.Node) {
s.ns.Persist(n)
}