diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go index 52403df46..c74faab7e 100644 --- a/common/prque/lazyqueue.go +++ b/common/prque/lazyqueue.go @@ -48,7 +48,7 @@ type LazyQueue struct { } type ( - PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback + PriorityCallback func(data interface{}) int64 // actual priority callback MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback ) @@ -139,11 +139,10 @@ func (q *LazyQueue) peekIndex() int { // Pop multiple times. Popped items are passed to the callback. MultiPop returns // when the callback returns false or there are no more items to pop. func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) { - now := q.clock.Now() nextIndex := q.peekIndex() for nextIndex != -1 { data := heap.Pop(q.queue[nextIndex]).(*item).value - heap.Push(q.popQueue, &item{data, q.priority(data, now)}) + heap.Push(q.popQueue, &item{data, q.priority(data)}) nextIndex = q.peekIndex() for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) { i := heap.Pop(q.popQueue).(*item) diff --git a/common/prque/lazyqueue_test.go b/common/prque/lazyqueue_test.go index be9491e24..9a831d628 100644 --- a/common/prque/lazyqueue_test.go +++ b/common/prque/lazyqueue_test.go @@ -40,7 +40,7 @@ type lazyItem struct { index int } -func testPriority(a interface{}, now mclock.AbsTime) int64 { +func testPriority(a interface{}) int64 { return a.(*lazyItem).p } diff --git a/les/client.go b/les/client.go index 4d07f844f..ecabfdf50 100644 --- a/les/client.go +++ b/les/client.go @@ -36,30 +36,33 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/les/vflux" vfc "github.com/ethereum/go-ethereum/les/vflux/client" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ) type LightEthereum struct { lesCommons - peers *serverPeerSet - reqDist *requestDistributor - retriever *retrieveManager - odr *LesOdr - relay *lesTxRelay - handler *clientHandler - txPool *light.TxPool - blockchain *light.LightChain - serverPool *vfc.ServerPool - dialCandidates enode.Iterator - pruner *pruner + peers *serverPeerSet + reqDist *requestDistributor + retriever *retrieveManager + odr *LesOdr + relay *lesTxRelay + handler *clientHandler + txPool *light.TxPool + blockchain *light.LightChain + serverPool *vfc.ServerPool + serverPoolIterator enode.Iterator + pruner *pruner bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports @@ -112,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { p2pConfig: &stack.Config().P2P, } - leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList) + leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, leth.prenegQuery, &mclock.System{}, config.UltraLightServers, requestList) leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout) @@ -189,6 +192,62 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { return leth, nil } +// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses +func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies { + reqsEnc, _ := rlp.EncodeToBytes(&reqs) + repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc) + var replies vflux.Replies + if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil { + return nil + } + return replies +} + +// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP +// service, as advertised in the ENR record +func (s *LightEthereum) vfxVersion(n *enode.Node) uint { + if n.Seq() == 0 { + var err error + if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 { + s.serverPool.Persist(n) + } else { + return 0 + } + } + + var les []rlp.RawValue + if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 { + return 0 + } + var version uint + rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility). + return version +} + +// prenegQuery sends a capacity query to the given server node to determine whether +// a connection slot is immediately available +func (s *LightEthereum) prenegQuery(n *enode.Node) int { + if s.vfxVersion(n) < 1 { + // UDP query not supported, always try TCP connection + return 1 + } + + var requests vflux.Requests + requests.Add("les", vflux.CapacityQueryName, vflux.CapacityQueryReq{ + Bias: 180, + AddTokens: []vflux.IntOrInf{{}}, + }) + replies := s.VfluxRequest(n, requests) + var cqr vflux.CapacityQueryReply + if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil + return -1 + } + if cqr[0] > 0 { + return 1 + } + return 0 +} + type LightDummyAPI struct{} // Etherbase is the address that mining rewards will be send to @@ -269,7 +328,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol { return p.Info() } return nil - }, s.dialCandidates) + }, s.serverPoolIterator) } // Start implements node.Lifecycle, starting all internal goroutines needed by the diff --git a/les/clientpool.go b/les/clientpool.go index 4e1499bf5..1aa63a281 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -24,11 +24,13 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/les/vflux" vfs "github.com/ethereum/go-ethereum/les/vflux/server" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nodestate" + "github.com/ethereum/go-ethereum/rlp" ) const ( @@ -382,3 +384,56 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) { } } } + +// serveCapQuery serves a vflux capacity query. It receives multiple token amount values +// and a bias time value. For each given token amount it calculates the maximum achievable +// capacity in case the amount is added to the balance. +func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte { + var req vflux.CapacityQueryReq + if rlp.DecodeBytes(data, &req) != nil { + return nil + } + if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen { + return nil + } + node := f.ns.GetNode(id) + if node == nil { + node = enode.SignNull(&enr.Record{}, id) + } + c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo) + if c == nil { + c = &clientInfo{node: node} + f.ns.SetField(node, clientInfoField, c) + f.ns.SetField(node, connAddressField, freeID) + defer func() { + f.ns.SetField(node, connAddressField, nil) + f.ns.SetField(node, clientInfoField, nil) + }() + if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil { + log.Error("BalanceField is missing", "node", node.ID()) + return nil + } + } + // use vfs.CapacityCurve to answer request for multiple newly bought token amounts + curve := f.pp.GetCapacityCurve().Exclude(id) + result := make(vflux.CapacityQueryReply, len(req.AddTokens)) + bias := time.Second * time.Duration(req.Bias) + if f.connectedBias > bias { + bias = f.connectedBias + } + pb, _ := c.balance.GetBalance() + for i, addTokens := range req.AddTokens { + add := addTokens.Int64() + result[i] = curve.MaxCapacity(func(capacity uint64) int64 { + return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity) + }) + if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap { + result[i] = f.minCap + } + if result[i] < f.minCap { + result[i] = 0 + } + } + reply, _ := rlp.EncodeToBytes(&result) + return reply +} diff --git a/les/clientpool_test.go b/les/clientpool_test.go index 5cff01040..345b373b0 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -508,8 +508,10 @@ func TestNegativeBalanceCalculation(t *testing.T) { for i := 0; i < 10; i++ { pool.disconnect(newPoolTestPeer(i, nil)) _, nb := getBalance(pool, newPoolTestPeer(i, nil)) - if checkDiff(nb, uint64(time.Minute)/1000) { - t.Fatalf("Negative balance mismatch, want %v, got %v", uint64(time.Minute)/1000, nb) + exp := uint64(time.Minute) / 1000 + exp -= exp / 120 // correct for negative balance expiration + if checkDiff(nb, exp) { + t.Fatalf("Negative balance mismatch, want %v, got %v", exp, nb) } } } diff --git a/les/enr_entry.go b/les/enr_entry.go index 1e56c1f17..8be4a7a00 100644 --- a/les/enr_entry.go +++ b/les/enr_entry.go @@ -27,7 +27,8 @@ import ( // lesEntry is the "les" ENR entry. This is set for LES servers only. type lesEntry struct { // Ignore additional fields (for forward compatibility). - _ []rlp.RawValue `rlp:"tail"` + VfxVersion uint + Rest []rlp.RawValue `rlp:"tail"` } func (lesEntry) ENRKey() string { return "les" } diff --git a/les/server.go b/les/server.go index 359784cf7..63feaf892 100644 --- a/les/server.go +++ b/les/server.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/flowcontrol" + "github.com/ethereum/go-ethereum/les/vflux" vfs "github.com/ethereum/go-ethereum/les/vflux/server" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" @@ -68,6 +69,7 @@ type LesServer struct { archiveMode bool // Flag whether the ethereum node runs in archive mode. handler *serverHandler broadcaster *broadcaster + vfluxServer *vfs.Server privateKey *ecdsa.PrivateKey // Flow control and capacity management @@ -112,12 +114,14 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les ns: ns, archiveMode: e.ArchiveMode(), broadcaster: newBroadcaster(ns), + vfluxServer: vfs.NewServer(time.Millisecond * 10), fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}), servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100), threadsBusy: config.LightServ/100 + 1, threadsIdle: threads, p2pSrv: node.Server(), } + srv.vfluxServer.Register(srv) issync := e.Synced if config.LightNoSyncServe { issync = func() bool { return true } @@ -201,7 +205,9 @@ func (s *LesServer) Protocols() []p2p.Protocol { }, nil) // Add "les" ENR entries. for i := range ps { - ps[i].Attributes = []enr.Entry{&lesEntry{}} + ps[i].Attributes = []enr.Entry{&lesEntry{ + VfxVersion: 1, + }} } return ps } @@ -211,10 +217,11 @@ func (s *LesServer) Start() error { s.privateKey = s.p2pSrv.PrivateKey s.broadcaster.setSignerKey(s.privateKey) s.handler.start() - s.wg.Add(1) go s.capacityManagement() - + if s.p2pSrv.DiscV5 != nil { + s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded) + } return nil } @@ -228,6 +235,7 @@ func (s *LesServer) Stop() error { s.costTracker.stop() s.handler.stop() s.servingQueue.stop() + s.vfluxServer.Stop() // Note, bloom trie indexer is closed by parent bloombits indexer. s.chtIndexer.Close() @@ -311,3 +319,18 @@ func (s *LesServer) dropClient(id enode.ID) { p.Peer.Disconnect(p2p.DiscRequested) } } + +// ServiceInfo implements vfs.Service +func (s *LesServer) ServiceInfo() (string, string) { + return "les", "Ethereum light client service" +} + +// Handle implements vfs.Service +func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte { + switch name { + case vflux.CapacityQueryName: + return s.clientPool.serveCapQuery(id, address, data) + default: + return nil + } +} diff --git a/les/vflux/client/serverpool.go b/les/vflux/client/serverpool.go index 95f724609..47ec4fee7 100644 --- a/les/vflux/client/serverpool.go +++ b/les/vflux/client/serverpool.go @@ -94,7 +94,7 @@ type nodeHistoryEnc struct { type queryFunc func(*enode.Node) int var ( - clientSetup = &nodestate.Setup{Version: 1} + clientSetup = &nodestate.Setup{Version: 2} sfHasValue = clientSetup.NewPersistentFlag("hasValue") sfQueried = clientSetup.NewFlag("queried") sfCanDial = clientSetup.NewFlag("canDial") @@ -131,9 +131,25 @@ var ( ) sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0))) sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{})) + sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}), + func(field interface{}) ([]byte, error) { + if enr, ok := field.(*enr.Record); ok { + enc, err := rlp.EncodeToBytes(enr) + return enc, err + } + return nil, errors.New("invalid field type") + }, + func(enc []byte) (interface{}, error) { + var enr enr.Record + if err := rlp.DecodeBytes(enc, &enr); err != nil { + return nil, err + } + return &enr, nil + }, + ) ) -// newServerPool creates a new server pool +// NewServerPool creates a new server pool func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) { s := &ServerPool{ db: db, @@ -151,15 +167,10 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio s.mixSources = append(s.mixSources, knownSelector) s.mixSources = append(s.mixSources, alwaysConnect) - iter := enode.Iterator(s.mixer) + s.dialIterator = s.mixer if query != nil { - iter = s.addPreNegFilter(iter, query) + s.dialIterator = s.addPreNegFilter(s.dialIterator, query) } - s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool { - s.ns.SetState(node, sfDialing, sfCanDial, 0) - s.ns.SetState(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10) - return true - }) s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) { if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() { @@ -169,7 +180,41 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio } }) - return s, s.dialIterator + return s, &serverPoolIterator{ + dialIterator: s.dialIterator, + nextFn: func(node *enode.Node) { + s.ns.Operation(func() { + s.ns.SetStateSub(node, sfDialing, sfCanDial, 0) + s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10) + }) + }, + nodeFn: s.DialNode, + } +} + +type serverPoolIterator struct { + dialIterator enode.Iterator + nextFn func(*enode.Node) + nodeFn func(*enode.Node) *enode.Node +} + +// Next implements enode.Iterator +func (s *serverPoolIterator) Next() bool { + if s.dialIterator.Next() { + s.nextFn(s.dialIterator.Node()) + return true + } + return false +} + +// Node implements enode.Iterator +func (s *serverPoolIterator) Node() *enode.Node { + return s.nodeFn(s.dialIterator.Node()) +} + +// Close implements enode.Iterator +func (s *serverPoolIterator) Close() { + s.dialIterator.Close() } // AddMetrics adds metrics to the server pool. Should be called before Start(). @@ -285,7 +330,6 @@ func (s *ServerPool) Start() { // stop stops the server pool func (s *ServerPool) Stop() { - s.dialIterator.Close() if s.fillSet != nil { s.fillSet.Close() } @@ -299,18 +343,23 @@ func (s *ServerPool) Stop() { s.vt.Stop() } -// registerPeer implements serverPeerSubscriber +// RegisterNode implements serverPeerSubscriber func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) { if atomic.LoadUint32(&s.started) == 0 { return nil, errors.New("server pool not started yet") } - s.ns.SetState(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0) nvt := s.vt.Register(node.ID()) - s.ns.SetField(node, sfiConnectedStats, nvt.RtStats()) + s.ns.Operation(func() { + s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0) + s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats()) + if node.IP().IsLoopback() { + s.ns.SetFieldSub(node, sfiLocalAddress, node.Record()) + } + }) return nvt, nil } -// unregisterPeer implements serverPeerSubscriber +// UnregisterNode implements serverPeerSubscriber func (s *ServerPool) UnregisterNode(node *enode.Node) { s.ns.Operation(func() { s.setRedialWait(node, dialCost, dialWaitStep) @@ -430,6 +479,7 @@ func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDia s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0) s.ns.SetFieldSub(node, sfiNodeWeight, nil) s.ns.SetFieldSub(node, sfiNodeHistory, nil) + s.ns.SetFieldSub(node, sfiLocalAddress, nil) } s.ns.Persist(node) // saved if node history or hasValue changed } @@ -520,3 +570,28 @@ func (s *ServerPool) calculateWeight(node *enode.Node) { func (s *ServerPool) API() *PrivateClientAPI { return NewPrivateClientAPI(s.vt) } + +type dummyIdentity enode.ID + +func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil } +func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] } + +// DialNode replaces the given enode with a locally generated one containing the ENR +// stored in the sfiLocalAddress field if present. This workaround ensures that nodes +// on the local network can be dialed at the local address if a connection has been +// successfully established previously. +// Note that NodeStateMachine always remembers the enode with the latest version of +// the remote signed ENR. ENR filtering should be performed on that version while +// dialNode should be used for dialing the node over TCP or UDP. +func (s *ServerPool) DialNode(n *enode.Node) *enode.Node { + if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok { + n, _ := enode.New(dummyIdentity(n.ID()), enr) + return n + } + return n +} + +// Persist immediately stores the state of a node in the node database +func (s *ServerPool) Persist(n *enode.Node) { + s.ns.Persist(n) +} diff --git a/les/vflux/client/serverpool_test.go b/les/vflux/client/serverpool_test.go index 3af3db95b..ee299618c 100644 --- a/les/vflux/client/serverpool_test.go +++ b/les/vflux/client/serverpool_test.go @@ -56,6 +56,7 @@ type ServerPoolTest struct { preNeg, preNegFail bool vt *ValueTracker sp *ServerPool + spi enode.Iterator input enode.Iterator testNodes []spTestNode trusted []string @@ -148,7 +149,7 @@ func (s *ServerPoolTest) start() { requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1} } - s.sp, _ = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList) + s.sp, s.spi = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList) s.sp.AddSource(s.input) s.sp.validSchemes = enode.ValidSchemesForTesting s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) } @@ -176,6 +177,7 @@ func (s *ServerPoolTest) start() { func (s *ServerPoolTest) stop() { close(s.quit) s.sp.Stop() + s.spi.Close() for i := range s.testNodes { n := &s.testNodes[i] if n.connected { @@ -208,9 +210,9 @@ func (s *ServerPoolTest) run() { if s.conn < spTestTarget { s.dialCount++ s.beginWait() - s.sp.dialIterator.Next() + s.spi.Next() s.endWait() - dial := s.sp.dialIterator.Node() + dial := s.spi.Node() id := dial.ID() idx := testNodeIndex(id) n := &s.testNodes[idx] diff --git a/les/vflux/requests.go b/les/vflux/requests.go new file mode 100644 index 000000000..11255607e --- /dev/null +++ b/les/vflux/requests.go @@ -0,0 +1,180 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package vflux + +import ( + "errors" + "math" + "math/big" + + "github.com/ethereum/go-ethereum/rlp" +) + +var ErrNoReply = errors.New("no reply for given request") + +const ( + MaxRequestLength = 16 // max number of individual requests in a batch + CapacityQueryName = "cq" + CapacityQueryMaxLen = 16 +) + +type ( + // Request describes a single vflux request inside a batch. Service and request + // type are identified by strings, parameters are RLP encoded. + Request struct { + Service, Name string + Params []byte + } + // Requests are a batch of vflux requests + Requests []Request + + // Replies are the replies to a batch of requests + Replies [][]byte + + // CapacityQueryReq is the encoding format of the capacity query + CapacityQueryReq struct { + Bias uint64 // seconds + AddTokens []IntOrInf + } + // CapacityQueryReq is the encoding format of the response to the capacity query + CapacityQueryReply []uint64 +) + +// Add encodes and adds a new request to the batch +func (r *Requests) Add(service, name string, val interface{}) (int, error) { + enc, err := rlp.EncodeToBytes(val) + if err != nil { + return -1, err + } + *r = append(*r, Request{ + Service: service, + Name: name, + Params: enc, + }) + return len(*r) - 1, nil +} + +// Get decodes the reply to the i-th request in the batch +func (r Replies) Get(i int, val interface{}) error { + if i < 0 || i >= len(r) { + return ErrNoReply + } + return rlp.DecodeBytes(r[i], val) +} + +const ( + IntNonNegative = iota + IntNegative + IntPlusInf + IntMinusInf +) + +// IntOrInf is the encoding format for arbitrary length signed integers that can also +// hold the values of +Inf or -Inf +type IntOrInf struct { + Type uint8 + Value big.Int +} + +// BigInt returns the value as a big.Int or panics if the value is infinity +func (i *IntOrInf) BigInt() *big.Int { + switch i.Type { + case IntNonNegative: + return new(big.Int).Set(&i.Value) + case IntNegative: + return new(big.Int).Neg(&i.Value) + case IntPlusInf: + panic(nil) // caller should check Inf() before trying to convert to big.Int + case IntMinusInf: + panic(nil) + } + return &big.Int{} // invalid type decodes to 0 value +} + +// Inf returns 1 if the value is +Inf, -1 if it is -Inf, 0 otherwise +func (i *IntOrInf) Inf() int { + switch i.Type { + case IntPlusInf: + return 1 + case IntMinusInf: + return -1 + } + return 0 // invalid type decodes to 0 value +} + +// Int64 limits the value between MinInt64 and MaxInt64 (even if it is +-Inf) and returns an int64 type +func (i *IntOrInf) Int64() int64 { + switch i.Type { + case IntNonNegative: + if i.Value.IsInt64() { + return i.Value.Int64() + } else { + return math.MaxInt64 + } + case IntNegative: + if i.Value.IsInt64() { + return -i.Value.Int64() + } else { + return math.MinInt64 + } + case IntPlusInf: + return math.MaxInt64 + case IntMinusInf: + return math.MinInt64 + } + return 0 // invalid type decodes to 0 value +} + +// SetBigInt sets the value to the given big.Int +func (i *IntOrInf) SetBigInt(v *big.Int) { + if v.Sign() >= 0 { + i.Type = IntNonNegative + i.Value.Set(v) + } else { + i.Type = IntNegative + i.Value.Neg(v) + } +} + +// SetInt64 sets the value to the given int64. Note that MaxInt64 translates to +Inf +// while MinInt64 translates to -Inf. +func (i *IntOrInf) SetInt64(v int64) { + if v >= 0 { + if v == math.MaxInt64 { + i.Type = IntPlusInf + } else { + i.Type = IntNonNegative + i.Value.SetInt64(v) + } + } else { + if v == math.MinInt64 { + i.Type = IntMinusInf + } else { + i.Type = IntNegative + i.Value.SetInt64(-v) + } + } +} + +// SetInf sets the value to +Inf or -Inf +func (i *IntOrInf) SetInf(sign int) { + if sign == 1 { + i.Type = IntPlusInf + } else { + i.Type = IntMinusInf + } +} diff --git a/les/vflux/server/balance.go b/les/vflux/server/balance.go index f5073d0db..db12a5c57 100644 --- a/les/vflux/server/balance.go +++ b/les/vflux/server/balance.go @@ -243,11 +243,11 @@ func (n *NodeBalance) RequestServed(cost uint64) uint64 { } // Priority returns the actual priority based on the current balance -func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 { +func (n *NodeBalance) Priority(capacity uint64) int64 { n.lock.Lock() defer n.lock.Unlock() - n.updateBalance(now) + n.updateBalance(n.bt.clock.Now()) return n.balanceToPriority(n.balance, capacity) } @@ -256,16 +256,35 @@ func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 { // in the current session. // If update is true then a priority callback is added that turns UpdateFlag on and off // in case the priority goes below the estimated minimum. -func (n *NodeBalance) EstMinPriority(at mclock.AbsTime, capacity uint64, update bool) int64 { +func (n *NodeBalance) EstimatePriority(capacity uint64, addBalance int64, future, bias time.Duration, update bool) int64 { n.lock.Lock() defer n.lock.Unlock() - var avgReqCost float64 - dt := time.Duration(n.lastUpdate - n.initTime) - if dt > time.Second { - avgReqCost = float64(n.sumReqCost) * 2 / float64(dt) + now := n.bt.clock.Now() + n.updateBalance(now) + b := n.balance + if addBalance != 0 { + offset := n.bt.posExp.LogOffset(now) + old := n.balance.pos.Value(offset) + if addBalance > 0 && (addBalance > maxBalance || old > maxBalance-uint64(addBalance)) { + b.pos = utils.ExpiredValue{} + b.pos.Add(maxBalance, offset) + } else { + b.pos.Add(addBalance, offset) + } } - pri := n.balanceToPriority(n.reducedBalance(at, capacity, avgReqCost), capacity) + if future > 0 { + var avgReqCost float64 + dt := time.Duration(n.lastUpdate - n.initTime) + if dt > time.Second { + avgReqCost = float64(n.sumReqCost) * 2 / float64(dt) + } + b = n.reducedBalance(b, now, future, capacity, avgReqCost) + } + if bias > 0 { + b = n.reducedBalance(b, now+mclock.AbsTime(future), bias, capacity, 0) + } + pri := n.balanceToPriority(b, capacity) if update { n.addCallback(balanceCallbackUpdate, pri, n.signalPriorityUpdate) } @@ -366,7 +385,7 @@ func (n *NodeBalance) deactivate() { // updateBalance updates balance based on the time factor func (n *NodeBalance) updateBalance(now mclock.AbsTime) { if n.active && now > n.lastUpdate { - n.balance = n.reducedBalance(now, n.capacity, 0) + n.balance = n.reducedBalance(n.balance, n.lastUpdate, time.Duration(now-n.lastUpdate), n.capacity, 0) n.lastUpdate = now } } @@ -546,23 +565,25 @@ func (n *NodeBalance) balanceToPriority(b balance, capacity uint64) int64 { } // reducedBalance estimates the reduced balance at a given time in the fututre based -// on the current balance, the time factor and an estimated average request cost per time ratio -func (n *NodeBalance) reducedBalance(at mclock.AbsTime, capacity uint64, avgReqCost float64) balance { - dt := float64(at - n.lastUpdate) - b := n.balance +// on the given balance, the time factor and an estimated average request cost per time ratio +func (n *NodeBalance) reducedBalance(b balance, start mclock.AbsTime, dt time.Duration, capacity uint64, avgReqCost float64) balance { + // since the costs are applied continuously during the dt time period we calculate + // the expiration offset at the middle of the period + at := start + mclock.AbsTime(dt/2) + dtf := float64(dt) if !b.pos.IsZero() { factor := n.posFactor.timePrice(capacity) + n.posFactor.RequestFactor*avgReqCost - diff := -int64(dt * factor) + diff := -int64(dtf * factor) dd := b.pos.Add(diff, n.bt.posExp.LogOffset(at)) if dd == diff { - dt = 0 + dtf = 0 } else { - dt += float64(dd) / factor + dtf += float64(dd) / factor } } if dt > 0 { factor := n.negFactor.timePrice(capacity) + n.negFactor.RequestFactor*avgReqCost - b.neg.Add(int64(dt*factor), n.bt.negExp.LogOffset(at)) + b.neg.Add(int64(dtf*factor), n.bt.negExp.LogOffset(at)) } return b } @@ -588,8 +609,9 @@ func (n *NodeBalance) timeUntil(priority int64) (time.Duration, bool) { } dt = float64(posBalance-newBalance) / timePrice return time.Duration(dt), true + } else { + dt = float64(posBalance) / timePrice } - dt = float64(posBalance) / timePrice } else { if priority > 0 { return 0, false diff --git a/les/vflux/server/balance_test.go b/les/vflux/server/balance_test.go index 6c817aa26..e22074db2 100644 --- a/les/vflux/server/balance_test.go +++ b/les/vflux/server/balance_test.go @@ -231,7 +231,7 @@ func TestBalanceToPriority(t *testing.T) { } for _, i := range inputs { node.SetBalance(i.pos, i.neg) - priority := node.Priority(b.clock.Now(), 1000) + priority := node.Priority(1000) if priority != i.priority { t.Fatalf("Priority mismatch, want %v, got %v", i.priority, priority) } @@ -272,7 +272,7 @@ func TestEstimatedPriority(t *testing.T) { for _, i := range inputs { b.clock.Run(i.runTime) node.RequestServed(i.reqCost) - priority := node.EstMinPriority(b.clock.Now()+mclock.AbsTime(i.futureTime), 1000000000, false) + priority := node.EstimatePriority(1000000000, 0, i.futureTime, 0, false) if priority != i.priority { t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority, priority) } diff --git a/les/vflux/server/prioritypool.go b/les/vflux/server/prioritypool.go index e3327aba7..e940ac7c6 100644 --- a/les/vflux/server/prioritypool.go +++ b/les/vflux/server/prioritypool.go @@ -101,17 +101,21 @@ type PriorityPool struct { minCap uint64 activeBias time.Duration capacityStepDiv uint64 + + cachedCurve *CapacityCurve + ccUpdatedAt mclock.AbsTime + ccUpdateForced bool } // nodePriority interface provides current and estimated future priorities on demand type nodePriority interface { // Priority should return the current priority of the node (higher is better) - Priority(now mclock.AbsTime, cap uint64) int64 + Priority(cap uint64) int64 // EstMinPriority should return a lower estimate for the minimum of the node priority // value starting from the current moment until the given time. If the priority goes // under the returned estimate before the specified moment then it is the caller's // responsibility to signal with updateFlag. - EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64 + EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 } // ppNodeInfo is the internal node descriptor of PriorityPool @@ -131,12 +135,12 @@ func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, cl ns: ns, PriorityPoolSetup: setup, clock: clock, - activeQueue: prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh), inactiveQueue: prque.New(inactiveSetIndex), minCap: minCap, activeBias: activeBias, capacityStepDiv: capacityStepDiv, } + pp.activeQueue = prque.NewLazyQueue(activeSetIndex, activePriority, pp.activeMaxPriority, clock, lazyQueueRefresh) ns.SubscribeField(pp.priorityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) { if newValue != nil { @@ -197,6 +201,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias if targetCap < pp.minCap { targetCap = pp.minCap } + if bias < pp.activeBias { + bias = pp.activeBias + } c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo) if c == nil { log.Error("RequestCapacity called for unknown node", "id", node.ID()) @@ -204,9 +211,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias } var priority int64 if targetCap > c.capacity { - priority = c.nodePriority.EstMinPriority(pp.clock.Now()+mclock.AbsTime(bias), targetCap, false) + priority = c.nodePriority.EstimatePriority(targetCap, 0, 0, bias, false) } else { - priority = c.nodePriority.Priority(pp.clock.Now(), targetCap) + priority = c.nodePriority.Priority(targetCap) } pp.markForChange(c) pp.setCapacity(c, targetCap) @@ -214,7 +221,7 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias pp.activeQueue.Remove(c.activeIndex) pp.inactiveQueue.Remove(c.inactiveIndex) pp.activeQueue.Push(c) - minPriority = pp.enforceLimits() + _, minPriority = pp.enforceLimits() // if capacity update is possible now then minPriority == math.MinInt64 // if it is not possible at all then minPriority == math.MaxInt64 allowed = priority > minPriority @@ -281,29 +288,34 @@ func invertPriority(p int64) int64 { } // activePriority callback returns actual priority of ppNodeInfo item in activeQueue -func activePriority(a interface{}, now mclock.AbsTime) int64 { +func activePriority(a interface{}) int64 { c := a.(*ppNodeInfo) if c.forced { return math.MinInt64 } if c.bias == 0 { - return invertPriority(c.nodePriority.Priority(now, c.capacity)) + return invertPriority(c.nodePriority.Priority(c.capacity)) + } else { + return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, 0, c.bias, true)) } - return invertPriority(c.nodePriority.EstMinPriority(now+mclock.AbsTime(c.bias), c.capacity, true)) } // activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue -func activeMaxPriority(a interface{}, until mclock.AbsTime) int64 { +func (pp *PriorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 { c := a.(*ppNodeInfo) if c.forced { return math.MinInt64 } - return invertPriority(c.nodePriority.EstMinPriority(until+mclock.AbsTime(c.bias), c.capacity, false)) + future := time.Duration(until - pp.clock.Now()) + if future < 0 { + future = 0 + } + return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, future, c.bias, false)) } // inactivePriority callback returns actual priority of ppNodeInfo item in inactiveQueue func (pp *PriorityPool) inactivePriority(p *ppNodeInfo) int64 { - return p.nodePriority.Priority(pp.clock.Now(), pp.minCap) + return p.nodePriority.Priority(pp.minCap) } // connectedNode is called when a new node has been added to the pool (InactiveFlag set) @@ -379,16 +391,19 @@ func (pp *PriorityPool) setCapacity(n *ppNodeInfo, cap uint64) { // enforceLimits enforces active node count and total capacity limits. It returns the // lowest active node priority. Note that this function is performed on the temporary // internal state. -func (pp *PriorityPool) enforceLimits() int64 { +func (pp *PriorityPool) enforceLimits() (*ppNodeInfo, int64) { if pp.activeCap <= pp.maxCap && pp.activeCount <= pp.maxCount { - return math.MinInt64 + return nil, math.MinInt64 } - var maxActivePriority int64 + var ( + c *ppNodeInfo + maxActivePriority int64 + ) pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool { - c := data.(*ppNodeInfo) + c = data.(*ppNodeInfo) pp.markForChange(c) maxActivePriority = priority - if c.capacity == pp.minCap { + if c.capacity == pp.minCap || pp.activeCount > pp.maxCount { pp.setCapacity(c, 0) } else { sub := c.capacity / pp.capacityStepDiv @@ -400,7 +415,7 @@ func (pp *PriorityPool) enforceLimits() int64 { } return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount }) - return invertPriority(maxActivePriority) + return c, invertPriority(maxActivePriority) } // finalizeChanges either commits or reverts temporary changes. The necessary capacity @@ -430,6 +445,9 @@ func (pp *PriorityPool) finalizeChanges(commit bool) (updates []capUpdate) { c.origCap = 0 } pp.changed = nil + if commit { + pp.ccUpdateForced = true + } return } @@ -472,6 +490,7 @@ func (pp *PriorityPool) tryActivate() []capUpdate { break } } + pp.ccUpdateForced = true return pp.finalizeChanges(commit) } @@ -500,3 +519,150 @@ func (pp *PriorityPool) updatePriority(node *enode.Node) { } updates = pp.tryActivate() } + +// CapacityCurve is a snapshot of the priority pool contents in a format that can efficiently +// estimate how much capacity could be granted to a given node at a given priority level. +type CapacityCurve struct { + points []curvePoint // curve points sorted in descending order of priority + index map[enode.ID][]int // curve point indexes belonging to each node + exclude []int // curve point indexes of excluded node + excludeFirst bool // true if activeCount == maxCount +} + +type curvePoint struct { + freeCap uint64 // available capacity and node count at the current priority level + nextPri int64 // next priority level where more capacity will be available +} + +// GetCapacityCurve returns a new or recently cached CapacityCurve based on the contents of the pool +func (pp *PriorityPool) GetCapacityCurve() *CapacityCurve { + pp.lock.Lock() + defer pp.lock.Unlock() + + now := pp.clock.Now() + dt := time.Duration(now - pp.ccUpdatedAt) + if !pp.ccUpdateForced && pp.cachedCurve != nil && dt < time.Second*10 { + return pp.cachedCurve + } + + pp.ccUpdateForced = false + pp.ccUpdatedAt = now + curve := &CapacityCurve{ + index: make(map[enode.ID][]int), + } + pp.cachedCurve = curve + + var excludeID enode.ID + excludeFirst := pp.maxCount == pp.activeCount + // reduce node capacities or remove nodes until nothing is left in the queue; + // record the available capacity and the necessary priority after each step + for pp.activeCap > 0 { + cp := curvePoint{} + if pp.activeCap > pp.maxCap { + log.Error("Active capacity is greater than allowed maximum", "active", pp.activeCap, "maximum", pp.maxCap) + } else { + cp.freeCap = pp.maxCap - pp.activeCap + } + // temporarily increase activeCap to enforce reducing or removing a node capacity + tempCap := cp.freeCap + 1 + pp.activeCap += tempCap + var next *ppNodeInfo + // enforceLimits removes the lowest priority node if it has minimal capacity, + // otherwise reduces its capacity + next, cp.nextPri = pp.enforceLimits() + pp.activeCap -= tempCap + if next == nil { + log.Error("GetCapacityCurve: cannot remove next element from the priority queue") + break + } + id := next.node.ID() + if excludeFirst { + // if the node count limit is already reached then mark the node with the + // lowest priority for exclusion + curve.excludeFirst = true + excludeID = id + excludeFirst = false + } + // multiple curve points and therefore multiple indexes may belong to a node + // if it was removed in multiple steps (if its capacity was more than the minimum) + curve.index[id] = append(curve.index[id], len(curve.points)) + curve.points = append(curve.points, cp) + } + // restore original state of the queue + pp.finalizeChanges(false) + curve.points = append(curve.points, curvePoint{ + freeCap: pp.maxCap, + nextPri: math.MaxInt64, + }) + if curve.excludeFirst { + curve.exclude = curve.index[excludeID] + } + return curve +} + +// Exclude returns a CapacityCurve with the given node excluded from the original curve +func (cc *CapacityCurve) Exclude(id enode.ID) *CapacityCurve { + if exclude, ok := cc.index[id]; ok { + // return a new version of the curve (only one excluded node can be selected) + // Note: if the first node was excluded by default (excludeFirst == true) then + // we can forget about that and exclude the node with the given id instead. + return &CapacityCurve{ + points: cc.points, + index: cc.index, + exclude: exclude, + } + } + return cc +} + +func (cc *CapacityCurve) getPoint(i int) curvePoint { + cp := cc.points[i] + if i == 0 && cc.excludeFirst { + cp.freeCap = 0 + return cp + } + for ii := len(cc.exclude) - 1; ii >= 0; ii-- { + ei := cc.exclude[ii] + if ei < i { + break + } + e1, e2 := cc.points[ei], cc.points[ei+1] + cp.freeCap += e2.freeCap - e1.freeCap + } + return cp +} + +// MaxCapacity calculates the maximum capacity available for a node with a given +// (monotonically decreasing) priority vs. capacity function. Note that if the requesting +// node is already in the pool then it should be excluded from the curve in order to get +// the correct result. +func (cc *CapacityCurve) MaxCapacity(priority func(cap uint64) int64) uint64 { + min, max := 0, len(cc.points)-1 // the curve always has at least one point + for min < max { + mid := (min + max) / 2 + cp := cc.getPoint(mid) + if cp.freeCap == 0 || priority(cp.freeCap) > cp.nextPri { + min = mid + 1 + } else { + max = mid + } + } + cp2 := cc.getPoint(min) + if cp2.freeCap == 0 || min == 0 { + return cp2.freeCap + } + cp1 := cc.getPoint(min - 1) + if priority(cp2.freeCap) > cp1.nextPri { + return cp2.freeCap + } + minc, maxc := cp1.freeCap, cp2.freeCap-1 + for minc < maxc { + midc := (minc + maxc + 1) / 2 + if midc == 0 || priority(midc) > cp1.nextPri { + minc = midc + } else { + maxc = midc - 1 + } + } + return maxc +} diff --git a/les/vflux/server/prioritypool_test.go b/les/vflux/server/prioritypool_test.go index cbb3f5b37..d83ddc176 100644 --- a/les/vflux/server/prioritypool_test.go +++ b/les/vflux/server/prioritypool_test.go @@ -20,6 +20,7 @@ import ( "math/rand" "reflect" "testing" + "time" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p/enode" @@ -42,6 +43,7 @@ func init() { const ( testCapacityStepDiv = 100 testCapacityToleranceDiv = 10 + testMinCap = 100 ) type ppTestClient struct { @@ -49,11 +51,11 @@ type ppTestClient struct { balance, cap uint64 } -func (c *ppTestClient) Priority(now mclock.AbsTime, cap uint64) int64 { +func (c *ppTestClient) Priority(cap uint64) int64 { return int64(c.balance / cap) } -func (c *ppTestClient) EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64 { +func (c *ppTestClient) EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 { return int64(c.balance / cap) } @@ -67,7 +69,7 @@ func TestPriorityPool(t *testing.T) { c.cap = newValue.(uint64) } }) - pp := NewPriorityPool(ns, ppTestSetup, clock, 100, 0, testCapacityStepDiv) + pp := NewPriorityPool(ns, ppTestSetup, clock, testMinCap, 0, testCapacityStepDiv) ns.Start() pp.SetLimits(100, 1000000) clients := make([]*ppTestClient, 100) @@ -94,7 +96,7 @@ func TestPriorityPool(t *testing.T) { for i := range clients { c := &ppTestClient{ node: enode.SignNull(&enr.Record{}, enode.ID{byte(i)}), - balance: 1000000000, + balance: 100000000000, cap: 1000, } sumBalance += c.balance @@ -109,7 +111,7 @@ func TestPriorityPool(t *testing.T) { for count := 0; count < 100; count++ { c := clients[rand.Intn(len(clients))] oldBalance := c.balance - c.balance = uint64(rand.Int63n(1000000000) + 1000000000) + c.balance = uint64(rand.Int63n(100000000000) + 100000000000) sumBalance += c.balance - oldBalance pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0) pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0) @@ -120,10 +122,124 @@ func TestPriorityPool(t *testing.T) { raise(c) } } + // check whether capacities are proportional to balances for _, c := range clients { check(c) } + if count%10 == 0 { + // test available capacity calculation with capacity curve + c = clients[rand.Intn(len(clients))] + curve := pp.GetCapacityCurve().Exclude(c.node.ID()) + + add := uint64(rand.Int63n(10000000000000)) + c.balance += add + sumBalance += add + expCap := curve.MaxCapacity(func(cap uint64) int64 { + return int64(c.balance / cap) + }) + //fmt.Println(expCap, c.balance, sumBalance) + /*for i, cp := range curve.points { + fmt.Println("cp", i, cp, "ex", curve.getPoint(i)) + }*/ + var ok bool + expFail := expCap + 1 + if expFail < testMinCap { + expFail = testMinCap + } + ns.Operation(func() { + _, ok = pp.RequestCapacity(c.node, expFail, 0, true) + }) + if ok { + t.Errorf("Request for more than expected available capacity succeeded") + } + if expCap >= testMinCap { + ns.Operation(func() { + _, ok = pp.RequestCapacity(c.node, expCap, 0, true) + }) + if !ok { + t.Errorf("Request for expected available capacity failed") + } + } + c.balance -= add + sumBalance -= add + pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0) + pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0) + for _, c := range clients { + raise(c) + } + } } ns.Stop() } + +func TestCapacityCurve(t *testing.T) { + clock := &mclock.Simulated{} + ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup) + pp := NewPriorityPool(ns, ppTestSetup, clock, 400000, 0, 2) + ns.Start() + pp.SetLimits(10, 10000000) + clients := make([]*ppTestClient, 10) + + for i := range clients { + c := &ppTestClient{ + node: enode.SignNull(&enr.Record{}, enode.ID{byte(i)}), + balance: 100000000000 * uint64(i+1), + cap: 1000000, + } + clients[i] = c + ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0) + ns.SetField(c.node, ppTestSetup.priorityField, c) + ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0) + ns.Operation(func() { + pp.RequestCapacity(c.node, c.cap, 0, true) + }) + } + + curve := pp.GetCapacityCurve() + check := func(balance, expCap uint64) { + cap := curve.MaxCapacity(func(cap uint64) int64 { + return int64(balance / cap) + }) + var fail bool + if cap == 0 || expCap == 0 { + fail = cap != expCap + } else { + pri := balance / cap + expPri := balance / expCap + fail = pri != expPri && pri != expPri+1 + } + if fail { + t.Errorf("Incorrect capacity for %d balance (got %d, expected %d)", balance, cap, expCap) + } + } + + check(0, 0) + check(10000000000, 100000) + check(50000000000, 500000) + check(100000000000, 1000000) + check(200000000000, 1000000) + check(300000000000, 1500000) + check(450000000000, 1500000) + check(600000000000, 2000000) + check(800000000000, 2000000) + check(1000000000000, 2500000) + + pp.SetLimits(11, 10000000) + curve = pp.GetCapacityCurve() + + check(0, 0) + check(10000000000, 100000) + check(50000000000, 500000) + check(150000000000, 750000) + check(200000000000, 1000000) + check(220000000000, 1100000) + check(275000000000, 1100000) + check(375000000000, 1500000) + check(450000000000, 1500000) + check(600000000000, 2000000) + check(800000000000, 2000000) + check(1000000000000, 2500000) + + ns.Stop() +} diff --git a/les/vflux/server/service.go b/les/vflux/server/service.go new file mode 100644 index 000000000..ab759ae44 --- /dev/null +++ b/les/vflux/server/service.go @@ -0,0 +1,122 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package server + +import ( + "net" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/les/vflux" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" +) + +type ( + // Server serves vflux requests + Server struct { + limiter *utils.Limiter + lock sync.Mutex + services map[string]*serviceEntry + delayPerRequest time.Duration + } + + // Service is a service registered at the Server and identified by a string id + Service interface { + ServiceInfo() (id, desc string) // only called during registration + Handle(id enode.ID, address string, name string, data []byte) []byte // never called concurrently + } + + serviceEntry struct { + id, desc string + backend Service + } +) + +// NewServer creates a new Server +func NewServer(delayPerRequest time.Duration) *Server { + return &Server{ + limiter: utils.NewLimiter(1000), + delayPerRequest: delayPerRequest, + services: make(map[string]*serviceEntry), + } +} + +// Register registers a Service +func (s *Server) Register(b Service) { + srv := &serviceEntry{backend: b} + srv.id, srv.desc = b.ServiceInfo() + if strings.Contains(srv.id, ":") { + // srv.id + ":" will be used as a service database prefix + log.Error("Service ID contains ':'", "id", srv.id) + return + } + s.lock.Lock() + s.services[srv.id] = srv + s.lock.Unlock() +} + +// Serve serves a vflux request batch +// Note: requests are served by the Handle functions of the registered services. Serve +// may be called concurrently but the Handle functions are called sequentially and +// therefore thread safety is guaranteed. +func (s *Server) Serve(id enode.ID, address string, requests vflux.Requests) vflux.Replies { + reqLen := uint(len(requests)) + if reqLen == 0 || reqLen > vflux.MaxRequestLength { + return nil + } + // Note: the value parameter will be supplied by the token sale module (total amount paid) + ch := <-s.limiter.Add(id, address, 0, reqLen) + if ch == nil { + return nil + } + // Note: the limiter ensures that the following section is not running concurrently, + // the lock only protects against contention caused by new service registration + s.lock.Lock() + results := make(vflux.Replies, len(requests)) + for i, req := range requests { + if service := s.services[req.Service]; service != nil { + results[i] = service.backend.Handle(id, address, req.Name, req.Params) + } + } + s.lock.Unlock() + time.Sleep(s.delayPerRequest * time.Duration(reqLen)) + close(ch) + return results +} + +// ServeEncoded serves an encoded vflux request batch and returns the encoded replies +func (s *Server) ServeEncoded(id enode.ID, addr *net.UDPAddr, req []byte) []byte { + var requests vflux.Requests + if err := rlp.DecodeBytes(req, &requests); err != nil { + return nil + } + results := s.Serve(id, addr.String(), requests) + if results == nil { + return nil + } + res, _ := rlp.EncodeToBytes(&results) + return res +} + +// Stop shuts down the server +func (s *Server) Stop() { + s.limiter.Stop() +} diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 9dd2b3173..eb01d95e9 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -74,7 +74,7 @@ type UDPv5 struct { // talkreq handler registry trlock sync.Mutex - trhandlers map[string]func([]byte) []byte + trhandlers map[string]TalkRequestHandler // channels into dispatch packetInCh chan ReadPacket @@ -96,6 +96,9 @@ type UDPv5 struct { wg sync.WaitGroup } +// TalkRequestHandler callback processes a talk request and optionally returns a reply +type TalkRequestHandler func(enode.ID, *net.UDPAddr, []byte) []byte + // callV5 represents a remote procedure call against another node. type callV5 struct { node *enode.Node @@ -145,7 +148,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { log: cfg.Log, validSchemes: cfg.ValidSchemes, clock: cfg.Clock, - trhandlers: make(map[string]func([]byte) []byte), + trhandlers: make(map[string]TalkRequestHandler), // channels into dispatch packetInCh: make(chan ReadPacket, 1), readNextCh: make(chan struct{}, 1), @@ -233,7 +236,7 @@ func (t *UDPv5) LocalNode() *enode.LocalNode { // RegisterTalkHandler adds a handler for 'talk requests'. The handler function is called // whenever a request for the given protocol is received and should return the response // data or nil. -func (t *UDPv5) RegisterTalkHandler(protocol string, handler func([]byte) []byte) { +func (t *UDPv5) RegisterTalkHandler(protocol string, handler TalkRequestHandler) { t.trlock.Lock() defer t.trlock.Unlock() t.trhandlers[protocol] = handler @@ -841,7 +844,7 @@ func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAd var response []byte if handler != nil { - response = handler(p.Message) + response = handler(fromID, fromAddr, p.Message) } resp := &v5wire.TalkResponse{ReqID: p.ReqID, Message: response} t.sendResponse(fromID, fromAddr, resp) diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index d91a2097d..292785bd5 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -435,7 +435,7 @@ func TestUDPv5_talkHandling(t *testing.T) { defer test.close() var recvMessage []byte - test.udp.RegisterTalkHandler("test", func(message []byte) []byte { + test.udp.RegisterTalkHandler("test", func(id enode.ID, addr *net.UDPAddr, message []byte) []byte { recvMessage = message return []byte("test response") }) diff --git a/p2p/nodestate/nodestate.go b/p2p/nodestate/nodestate.go index def93bac4..d3166f1d8 100644 --- a/p2p/nodestate/nodestate.go +++ b/p2p/nodestate/nodestate.go @@ -599,6 +599,7 @@ func (ns *NodeStateMachine) updateEnode(n *enode.Node) (enode.ID, *nodeInfo) { node := ns.nodes[id] if node != nil && n.Seq() > node.node.Seq() { node.node = n + node.dirty = true } return id, node }