diff --git a/les/client.go b/les/client.go index a557e7ccb..99f79bb20 100644 --- a/les/client.go +++ b/les/client.go @@ -73,8 +73,9 @@ type LightEthereum struct { accountManager *accounts.Manager netRPCService *ethapi.PublicNetAPI - p2pServer *p2p.Server - p2pConfig *p2p.Config + p2pServer *p2p.Server + p2pConfig *p2p.Config + udpEnabled bool } // New creates an instance of the light client. @@ -113,10 +114,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), p2pServer: stack.Server(), p2pConfig: &stack.Config().P2P, + udpEnabled: stack.Config().P2P.DiscoveryV5, } var prenegQuery vfc.QueryFunc - if leth.p2pServer.DiscV5 != nil { + if leth.udpEnabled { prenegQuery = leth.prenegQuery } leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList) @@ -198,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { // VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies { - if s.p2pServer.DiscV5 == nil { + if !s.udpEnabled { return nil } reqsEnc, _ := rlp.EncodeToBytes(&reqs) @@ -215,7 +217,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R func (s *LightEthereum) vfxVersion(n *enode.Node) uint { if n.Seq() == 0 { var err error - if s.p2pServer.DiscV5 == nil { + if !s.udpEnabled { return 0 } if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 { @@ -346,7 +348,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { func (s *LightEthereum) Start() error { log.Warn("Light client mode is an experimental feature") - discovery, err := s.setupDiscovery(s.p2pConfig) + if s.udpEnabled && s.p2pServer.DiscV5 == nil { + s.udpEnabled = false + log.Error("Discovery v5 is not initialized") + } + discovery, err := s.setupDiscovery() if err != nil { return err } diff --git a/les/clientpool.go b/les/clientpool.go index 1aa63a281..3965d5450 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -72,6 +72,7 @@ type clientPool struct { clock mclock.Clock closed bool removePeer func(enode.ID) + synced func() bool ns *nodestate.NodeStateMachine pp *vfs.PriorityPool bt *vfs.BalanceTracker @@ -107,7 +108,7 @@ type clientInfo struct { } // newClientPool creates a new client pool -func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool { +func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool { pool := &clientPool{ ns: ns, BalanceTrackerSetup: balanceTrackerSetup, @@ -116,6 +117,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap minCap: minCap, connectedBias: connectedBias, removePeer: removePeer, + synced: synced, } pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{}) pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4) @@ -396,6 +398,13 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen { return nil } + result := make(vflux.CapacityQueryReply, len(req.AddTokens)) + if !f.synced() { + capacityQueryZeroMeter.Mark(1) + reply, _ := rlp.EncodeToBytes(&result) + return reply + } + node := f.ns.GetNode(id) if node == nil { node = enode.SignNull(&enr.Record{}, id) @@ -416,7 +425,6 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by } // use vfs.CapacityCurve to answer request for multiple newly bought token amounts curve := f.pp.GetCapacityCurve().Exclude(id) - result := make(vflux.CapacityQueryReply, len(req.AddTokens)) bias := time.Second * time.Duration(req.Bias) if f.connectedBias > bias { bias = f.connectedBias @@ -434,6 +442,12 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by result[i] = 0 } } + // add first result to metrics (don't care about priority client multi-queries yet) + if result[0] == 0 { + capacityQueryZeroMeter.Mark(1) + } else { + capacityQueryNonZeroMeter.Mark(1) + } reply, _ := rlp.EncodeToBytes(&result) return reply } diff --git a/les/clientpool_test.go b/les/clientpool_test.go index 345b373b0..2aee44454 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -133,7 +133,7 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando disconnFn = func(id enode.ID) { disconnCh <- int(id[0]) + int(id[1])<<8 } - pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn) + pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn, alwaysTrueFn) ) pool.ns.Start() @@ -239,7 +239,7 @@ func TestConnectPaidClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) @@ -255,7 +255,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -274,7 +274,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -304,7 +304,7 @@ func TestPaidClientKickedOut(t *testing.T) { removeFn := func(id enode.ID) { kickedCh <- int(id[0]) } - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() pool.bt.SetExpirationTCs(0, 0) defer pool.stop() @@ -335,7 +335,7 @@ func TestConnectFreeClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) @@ -352,7 +352,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) { db = rawdb.NewMemoryDatabase() ) removeFn := func(enode.ID) {} // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -382,7 +382,7 @@ func TestFreeClientKickedOut(t *testing.T) { kicked = make(chan int, 100) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -424,7 +424,7 @@ func TestPositiveBalanceCalculation(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -448,7 +448,7 @@ func TestDowngradePriorityClient(t *testing.T) { kicked = make(chan int, 10) ) removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -483,7 +483,7 @@ func TestNegativeBalanceCalculation(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(10, uint64(10)) // Total capacity limit is 10 @@ -521,7 +521,7 @@ func TestInactiveClient(t *testing.T) { clock mclock.Simulated db = rawdb.NewMemoryDatabase() ) - pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}) + pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn) pool.ns.Start() defer pool.stop() pool.setLimits(2, uint64(2)) diff --git a/les/enr_entry.go b/les/enr_entry.go index 8be4a7a00..892c3530d 100644 --- a/les/enr_entry.go +++ b/les/enr_entry.go @@ -18,7 +18,6 @@ package les import ( "github.com/ethereum/go-ethereum/core/forkid" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/dnsdisc" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" @@ -42,7 +41,7 @@ type ethEntry struct { func (ethEntry) ENRKey() string { return "eth" } // setupDiscovery creates the node discovery source for the eth protocol. -func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) { +func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) { it := enode.NewFairMix(0) // Enable DNS discovery. @@ -56,7 +55,7 @@ func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error } // Enable DHT. - if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil { + if eth.udpEnabled { it.AddSource(eth.p2pServer.DiscV5.RandomNodes()) } diff --git a/les/metrics.go b/les/metrics.go index 9a79fd1bb..5a8d4bbe0 100644 --- a/les/metrics.go +++ b/les/metrics.go @@ -73,10 +73,12 @@ var ( serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil) clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil) - totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) - totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) - totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) - blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) + totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) + totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil) + blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil) + capacityQueryZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryZero", nil) + capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil) requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil) requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil) diff --git a/les/server.go b/les/server.go index 9627d65af..0351bdd80 100644 --- a/les/server.go +++ b/les/server.go @@ -149,7 +149,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les srv.maxCapacity = totalRecharge } srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2) - srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient) + srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync) srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}) checkpoint := srv.latestLocalCheckpoint() diff --git a/les/test_helper.go b/les/test_helper.go index e1d3beb6a..e49bfc873 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -307,7 +307,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da } server.costTracker, server.minCapacity = newCostTracker(db, server.config) server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism. - server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}) + server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn) server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true }) if server.oracle != nil { @@ -319,6 +319,10 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da return server.handler, simulation } +func alwaysTrueFn() bool { + return true +} + // testPeer is a simulated peer to allow testing direct network calls. type testPeer struct { cpeer *clientPeer diff --git a/les/vflux/client/serverpool.go b/les/vflux/client/serverpool.go index e73b277ce..cc0254c12 100644 --- a/les/vflux/client/serverpool.go +++ b/les/vflux/client/serverpool.go @@ -47,7 +47,8 @@ const ( nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set minRedialWait = 10 // minimum redial wait time in seconds preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries - maxQueryFails = 100 // number of consecutive UDP query failures before we print a warning + warnQueryFails = 20 // number of consecutive UDP query failures before we print a warning + maxQueryFails = 100 // number of consecutive UDP query failures when then chance of skipping a query reaches 50% ) // ServerPool provides a node iterator for dial candidates. The output is a mix of newly discovered @@ -94,16 +95,16 @@ type nodeHistoryEnc struct { type QueryFunc func(*enode.Node) int var ( - clientSetup = &nodestate.Setup{Version: 2} - sfHasValue = clientSetup.NewPersistentFlag("hasValue") - sfQueried = clientSetup.NewFlag("queried") - sfCanDial = clientSetup.NewFlag("canDial") - sfDialing = clientSetup.NewFlag("dialed") - sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout") - sfConnected = clientSetup.NewFlag("connected") - sfRedialWait = clientSetup.NewFlag("redialWait") - sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect") - sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialing, sfConnected, sfRedialWait) + clientSetup = &nodestate.Setup{Version: 2} + sfHasValue = clientSetup.NewPersistentFlag("hasValue") + sfQuery = clientSetup.NewFlag("query") + sfCanDial = clientSetup.NewFlag("canDial") + sfDialing = clientSetup.NewFlag("dialed") + sfWaitDialTimeout = clientSetup.NewFlag("dialTimeout") + sfConnected = clientSetup.NewFlag("connected") + sfRedialWait = clientSetup.NewFlag("redialWait") + sfAlwaysConnect = clientSetup.NewFlag("alwaysConnect") + sfDialProcess = nodestate.MergeFlags(sfQuery, sfCanDial, sfDialing, sfConnected, sfRedialWait) sfiNodeHistory = clientSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}), func(field interface{}) ([]byte, error) { @@ -162,8 +163,8 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio } s.recalTimeout() s.mixer = enode.NewFairMix(mixTimeout) - knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDisableSelection, sfiNodeWeight) - alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil) + knownSelector := NewWrsIterator(s.ns, sfHasValue, sfDialProcess, sfiNodeWeight) + alwaysConnect := NewQueueIterator(s.ns, sfAlwaysConnect, sfDialProcess, true, nil) s.mixSources = append(s.mixSources, knownSelector) s.mixSources = append(s.mixSources, alwaysConnect) @@ -226,7 +227,7 @@ func (s *ServerPool) AddMetrics( s.totalValueGauge = totalValueGauge s.sessionValueMeter = sessionValueMeter if serverSelectableGauge != nil { - s.ns.AddLogMetrics(sfHasValue, sfDisableSelection, "selectable", nil, nil, serverSelectableGauge) + s.ns.AddLogMetrics(sfHasValue, sfDialProcess, "selectable", nil, nil, serverSelectableGauge) } if serverDialedMeter != nil { s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil) @@ -247,43 +248,51 @@ func (s *ServerPool) AddSource(source enode.Iterator) { // Nodes that are filtered out and does not appear on the output iterator are put back // into redialWait state. func (s *ServerPool) addPreNegFilter(input enode.Iterator, query QueryFunc) enode.Iterator { - s.fillSet = NewFillSet(s.ns, input, sfQueried) - s.ns.SubscribeState(sfQueried, func(n *enode.Node, oldState, newState nodestate.Flags) { - if newState.Equals(sfQueried) { - fails := atomic.LoadUint32(&s.queryFails) - if fails == maxQueryFails { - log.Warn("UDP pre-negotiation query does not seem to work") + s.fillSet = NewFillSet(s.ns, input, sfQuery) + s.ns.SubscribeState(sfDialProcess, func(n *enode.Node, oldState, newState nodestate.Flags) { + if !newState.Equals(sfQuery) { + if newState.HasAll(sfQuery) { + // remove query flag if the node is already somewhere in the dial process + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) } - if fails > maxQueryFails { - fails = maxQueryFails - } - if rand.Intn(maxQueryFails*2) < int(fails) { - // skip pre-negotiation with increasing chance, max 50% - // this ensures that the client can operate even if UDP is not working at all - s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) - // set canDial before resetting queried so that FillSet will not read more - // candidates unnecessarily - s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0) - return - } - go func() { - q := query(n) - if q == -1 { - atomic.AddUint32(&s.queryFails, 1) - } else { - atomic.StoreUint32(&s.queryFails, 0) - } - s.ns.Operation(func() { - // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait - if q == 1 { - s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) - } else { - s.setRedialWait(n, queryCost, queryWaitStep) - } - s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0) - }) - }() + return } + fails := atomic.LoadUint32(&s.queryFails) + failMax := fails + if failMax > maxQueryFails { + failMax = maxQueryFails + } + if rand.Intn(maxQueryFails*2) < int(failMax) { + // skip pre-negotiation with increasing chance, max 50% + // this ensures that the client can operate even if UDP is not working at all + s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) + // set canDial before resetting queried so that FillSet will not read more + // candidates unnecessarily + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) + return + } + go func() { + q := query(n) + if q == -1 { + atomic.AddUint32(&s.queryFails, 1) + fails++ + if fails%warnQueryFails == 0 { + // warn if a large number of consecutive queries have failed + log.Warn("UDP connection queries failed", "count", fails) + } + } else { + atomic.StoreUint32(&s.queryFails, 0) + } + s.ns.Operation(func() { + // we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait + if q == 1 { + s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10) + } else { + s.setRedialWait(n, queryCost, queryWaitStep) + } + s.ns.SetStateSub(n, nodestate.Flags{}, sfQuery, 0) + }) + }() }) return NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) { if waiting {