les, les/lespay/client: add service value statistics and API (#20837)

This PR adds service value measurement statistics to the light client. It
also adds a private API that makes these statistics accessible. A follow-up
PR will add the new server pool which uses these statistics to select
servers with good performance.

This document describes the function of the new components:
https://gist.github.com/zsfelfoldi/3c7ace895234b7b345ab4f71dab102d4

Co-authored-by: rjl493456442 <garyrong0905@gmail.com>
Co-authored-by: rjl493456442 <garyrong0905@gmail.com>
This commit is contained in:
Felföldi Zsolt 2020-04-09 11:55:32 +02:00 committed by GitHub
parent 15540ae992
commit 0851646e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 2142 additions and 38 deletions

View File

@ -33,6 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPayJs,
}
const ChequebookJs = `
@ -856,3 +857,34 @@ web3._extend({
]
});
`
const LESPayJs = `
web3._extend({
property: 'lespay',
methods:
[
new web3._extend.Method({
name: 'distribution',
call: 'lespay_distribution',
params: 2
}),
new web3._extend.Method({
name: 'timeout',
call: 'lespay_timeout',
params: 2
}),
new web3._extend.Method({
name: 'value',
call: 'lespay_value',
params: 2
}),
],
properties:
[
new web3._extend.Property({
name: 'requestStats',
getter: 'lespay_requestStats'
}),
]
});
`

View File

@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
func (b *benchmarkTxSend) request(peer *serverPeer, index int) error {
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
return peer.sendTxs(0, enc)
return peer.sendTxs(0, 1, enc)
}
// benchmarkTxStatus implements requestBenchmark

View File

@ -19,6 +19,7 @@ package les
import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@ -37,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les/checkpointoracle"
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
@ -58,6 +60,7 @@ type LightEthereum struct {
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
@ -74,6 +77,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if err != nil {
return nil, err
}
lespayDb, err := ctx.OpenDatabase("lespay", 0, 0, "eth/db/lespay")
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis,
config.OverrideIstanbul, config.OverrideMuirGlacier)
if _, isCompat := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !isCompat {
@ -99,7 +106,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
serverPool: newServerPool(chainDb, config.UltraLightServers),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
leth.relay = newLesTxRelay(peers, leth.retriever)
@ -154,6 +163,23 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
return leth, nil
}
// vtSubscription implements serverPeerSubscriber
type vtSubscription lpc.ValueTracker
// registerPeer implements serverPeerSubscriber
func (v *vtSubscription) registerPeer(p *serverPeer) {
vt := (*lpc.ValueTracker)(v)
p.setValueTracker(vt, vt.Register(p.ID()))
p.updateVtParams()
}
// unregisterPeer implements serverPeerSubscriber
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
vt := (*lpc.ValueTracker)(v)
vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}
type LightDummyAPI struct{}
// Etherbase is the address that mining rewards will be send to
@ -207,6 +233,11 @@ func (s *LightEthereum) APIs() []rpc.API {
Version: "1.0",
Service: NewPrivateLightAPI(&s.lesCommons),
Public: false,
}, {
Namespace: "lespay",
Version: "1.0",
Service: lpc.NewPrivateClientAPI(s.valueTracker),
Public: false,
},
}...)
}
@ -266,6 +297,7 @@ func (s *LightEthereum) Stop() error {
s.engine.Close()
s.eventMux.Stop()
s.serverPool.stop()
s.valueTracker.Stop()
s.chainDb.Close()
s.wg.Wait()
log.Info("Light ethereum stopped")

View File

@ -180,6 +180,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrRequestRejected, "")
}
p.updateFlowControl(update)
p.updateVtParams()
if req.Hash != (common.Hash{}) {
if p.announceType == announceTypeNone {
@ -205,6 +206,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
if h.fetcher.requestedID(resp.ReqID) {
h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
@ -222,6 +224,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgBlockBodies,
ReqID: resp.ReqID,
@ -237,6 +240,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgCode,
ReqID: resp.ReqID,
@ -252,6 +256,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgReceipts,
ReqID: resp.ReqID,
@ -267,6 +272,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgProofsV2,
ReqID: resp.ReqID,
@ -282,6 +288,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgHelperTrieProofs,
ReqID: resp.ReqID,
@ -297,6 +304,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)
deliverMsg = &Msg{
MsgType: MsgTxStatus,
ReqID: resp.ReqID,

107
les/lespay/client/api.go Normal file
View File

@ -0,0 +1,107 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/p2p/enode"
)
// PrivateClientAPI implements the lespay client side API
type PrivateClientAPI struct {
vt *ValueTracker
}
// NewPrivateClientAPI creates a PrivateClientAPI
func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI {
return &PrivateClientAPI{vt}
}
// parseNodeStr converts either an enode address or a plain hex node id to enode.ID
func parseNodeStr(nodeStr string) (enode.ID, error) {
if id, err := enode.ParseID(nodeStr); err == nil {
return id, nil
}
if node, err := enode.Parse(enode.ValidSchemes, nodeStr); err == nil {
return node.ID(), nil
} else {
return enode.ID{}, err
}
}
// RequestStats returns the current contents of the reference request basket, with
// request values meaning average per request rather than total.
func (api *PrivateClientAPI) RequestStats() []RequestStatsItem {
return api.vt.RequestStats()
}
// Distribution returns a distribution as a series of (X, Y) chart coordinates,
// where the X axis is the response time in seconds while the Y axis is the amount of
// service value received with a response time close to the X coordinate.
// The distribution is optionally normalized to a sum of 1.
// If nodeStr == "" then the global distribution is returned, otherwise the individual
// distribution of the specified server node.
func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error) {
var expFactor utils.ExpirationFactor
if !normalized {
expFactor = utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
}
if nodeStr == "" {
return api.vt.RtStats().Distribution(normalized, expFactor), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return api.vt.GetNode(id).RtStats().Distribution(normalized, expFactor), nil
} else {
return RtDistribution{}, err
}
}
// Timeout suggests a timeout value based on either the global distribution or the
// distribution of the specified node. The parameter is the desired rate of timeouts
// assuming a similar distribution in the future.
// Note that the actual timeout should have a sensible minimum bound so that operating
// under ideal working conditions for a long time (for example, using a local server
// with very low response times) will not make it very hard for the system to accommodate
// longer response times in the future.
func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error) {
if nodeStr == "" {
return float64(api.vt.RtStats().Timeout(failRate)) / float64(time.Second), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return float64(api.vt.GetNode(id).RtStats().Timeout(failRate)) / float64(time.Second), nil
} else {
return 0, err
}
}
// Value calculates the total service value provided either globally or by the specified
// server node, using a weight function based on the given timeout.
func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error) {
wt := TimeoutWeights(time.Duration(timeout * float64(time.Second)))
expFactor := utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
if nodeStr == "" {
return api.vt.RtStats().Value(wt, expFactor), nil
}
if id, err := parseNodeStr(nodeStr); err == nil {
return api.vt.GetNode(id).RtStats().Value(wt, expFactor), nil
} else {
return 0, err
}
}

View File

@ -0,0 +1,285 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"io"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/rlp"
)
const basketFactor = 1000000 // reference basket amount and value scale factor
// referenceBasket keeps track of global request usage statistics and the usual prices
// of each used request type relative to each other. The amounts in the basket are scaled
// up by basketFactor because of the exponential expiration of long-term statistical data.
// Values are scaled so that the sum of all amounts and the sum of all values are equal.
//
// reqValues represent the internal relative value estimates for each request type and are
// calculated as value / amount. The average reqValue of all used requests is 1.
// In other words: SUM(refBasket[type].amount * reqValue[type]) = SUM(refBasket[type].amount)
type referenceBasket struct {
basket requestBasket
reqValues []float64 // contents are read only, new slice is created for each update
}
// serverBasket collects served request amount and value statistics for a single server.
//
// Values are gradually transferred to the global reference basket with a long time
// constant so that each server basket represents long term usage and price statistics.
// When the transferred part is added to the reference basket the values are scaled so
// that their sum equals the total value calculated according to the previous reqValues.
// The ratio of request values coming from the server basket represent the pricing of
// the specific server and modify the global estimates with a weight proportional to
// the amount of service provided by the server.
type serverBasket struct {
basket requestBasket
rvFactor float64
}
type (
// requestBasket holds amounts and values for each request type.
// These values are exponentially expired (see utils.ExpiredValue). The power of 2
// exponent is applicable to all values within.
requestBasket struct {
items []basketItem
exp uint64
}
// basketItem holds amount and value for a single request type. Value is the total
// relative request value accumulated for served requests while amount is the counter
// for each request type.
// Note that these values are both scaled up by basketFactor because of the exponential
// expiration.
basketItem struct {
amount, value uint64
}
)
// setExp sets the power of 2 exponent of the structure, scaling base values (the amounts
// and request values) up or down if necessary.
func (b *requestBasket) setExp(exp uint64) {
if exp > b.exp {
shift := exp - b.exp
for i, item := range b.items {
item.amount >>= shift
item.value >>= shift
b.items[i] = item
}
b.exp = exp
}
if exp < b.exp {
shift := b.exp - exp
for i, item := range b.items {
item.amount <<= shift
item.value <<= shift
b.items[i] = item
}
b.exp = exp
}
}
// init initializes a new server basket with the given service vector size (number of
// different request types)
func (s *serverBasket) init(size int) {
if s.basket.items == nil {
s.basket.items = make([]basketItem, size)
}
}
// add adds the give type and amount of requests to the basket. Cost is calculated
// according to the server's own cost table.
func (s *serverBasket) add(reqType, reqAmount uint32, reqCost uint64, expFactor utils.ExpirationFactor) {
s.basket.setExp(expFactor.Exp)
i := &s.basket.items[reqType]
i.amount += uint64(float64(uint64(reqAmount)*basketFactor) * expFactor.Factor)
i.value += uint64(float64(reqCost) * s.rvFactor * expFactor.Factor)
}
// updateRvFactor updates the request value factor that scales server costs into the
// local value dimensions.
func (s *serverBasket) updateRvFactor(rvFactor float64) {
s.rvFactor = rvFactor
}
// transfer decreases amounts and values in the basket with the given ratio and
// moves the removed amounts into a new basket which is returned and can be added
// to the global reference basket.
func (s *serverBasket) transfer(ratio float64) requestBasket {
res := requestBasket{
items: make([]basketItem, len(s.basket.items)),
exp: s.basket.exp,
}
for i, v := range s.basket.items {
ta := uint64(float64(v.amount) * ratio)
tv := uint64(float64(v.value) * ratio)
if ta > v.amount {
ta = v.amount
}
if tv > v.value {
tv = v.value
}
s.basket.items[i] = basketItem{v.amount - ta, v.value - tv}
res.items[i] = basketItem{ta, tv}
}
return res
}
// init initializes the reference basket with the given service vector size (number of
// different request types)
func (r *referenceBasket) init(size int) {
r.reqValues = make([]float64, size)
r.normalize()
r.updateReqValues()
}
// add adds the transferred part of a server basket to the reference basket while scaling
// value amounts so that their sum equals the total value calculated according to the
// previous reqValues.
func (r *referenceBasket) add(newBasket requestBasket) {
r.basket.setExp(newBasket.exp)
// scale newBasket to match service unit value
var (
totalCost uint64
totalValue float64
)
for i, v := range newBasket.items {
totalCost += v.value
totalValue += float64(v.amount) * r.reqValues[i]
}
if totalCost > 0 {
// add to reference with scaled values
scaleValues := totalValue / float64(totalCost)
for i, v := range newBasket.items {
r.basket.items[i].amount += v.amount
r.basket.items[i].value += uint64(float64(v.value) * scaleValues)
}
}
r.updateReqValues()
}
// updateReqValues recalculates reqValues after adding transferred baskets. Note that
// values should be normalized first.
func (r *referenceBasket) updateReqValues() {
r.reqValues = make([]float64, len(r.reqValues))
for i, b := range r.basket.items {
if b.amount > 0 {
r.reqValues[i] = float64(b.value) / float64(b.amount)
} else {
r.reqValues[i] = 0
}
}
}
// normalize ensures that the sum of values equal the sum of amounts in the basket.
func (r *referenceBasket) normalize() {
var sumAmount, sumValue uint64
for _, b := range r.basket.items {
sumAmount += b.amount
sumValue += b.value
}
add := float64(int64(sumAmount-sumValue)) / float64(sumValue)
for i, b := range r.basket.items {
b.value += uint64(int64(float64(b.value) * add))
r.basket.items[i] = b
}
}
// reqValueFactor calculates the request value factor applicable to the server with
// the given announced request cost list
func (r *referenceBasket) reqValueFactor(costList []uint64) float64 {
var (
totalCost float64
totalValue uint64
)
for i, b := range r.basket.items {
totalCost += float64(costList[i]) * float64(b.amount) // use floats to avoid overflow
totalValue += b.value
}
if totalCost < 1 {
return 0
}
return float64(totalValue) * basketFactor / totalCost
}
// EncodeRLP implements rlp.Encoder
func (b *basketItem) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{b.amount, b.value})
}
// DecodeRLP implements rlp.Decoder
func (b *basketItem) DecodeRLP(s *rlp.Stream) error {
var item struct {
Amount, Value uint64
}
if err := s.Decode(&item); err != nil {
return err
}
b.amount, b.value = item.Amount, item.Value
return nil
}
// EncodeRLP implements rlp.Encoder
func (r *requestBasket) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{r.items, r.exp})
}
// DecodeRLP implements rlp.Decoder
func (r *requestBasket) DecodeRLP(s *rlp.Stream) error {
var enc struct {
Items []basketItem
Exp uint64
}
if err := s.Decode(&enc); err != nil {
return err
}
r.items, r.exp = enc.Items, enc.Exp
return nil
}
// convertMapping converts a basket loaded from the database into the current format.
// If the available request types and their mapping into the service vector differ from
// the one used when saving the basket then this function reorders old fields and fills
// in previously unknown fields by scaling up amounts and values taken from the
// initialization basket.
func (r requestBasket) convertMapping(oldMapping, newMapping []string, initBasket requestBasket) requestBasket {
nameMap := make(map[string]int)
for i, name := range oldMapping {
nameMap[name] = i
}
rc := requestBasket{items: make([]basketItem, len(newMapping))}
var scale, oldScale, newScale float64
for i, name := range newMapping {
if ii, ok := nameMap[name]; ok {
rc.items[i] = r.items[ii]
oldScale += float64(initBasket.items[i].amount) * float64(initBasket.items[i].amount)
newScale += float64(rc.items[i].amount) * float64(initBasket.items[i].amount)
}
}
if oldScale > 1e-10 {
scale = newScale / oldScale
} else {
scale = 1
}
for i, name := range newMapping {
if _, ok := nameMap[name]; !ok {
rc.items[i].amount = uint64(float64(initBasket.items[i].amount) * scale)
rc.items[i].value = uint64(float64(initBasket.items[i].value) * scale)
}
}
return rc
}

View File

@ -0,0 +1,161 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/les/utils"
)
func checkU64(t *testing.T, name string, value, exp uint64) {
if value != exp {
t.Errorf("Incorrect value for %s: got %d, expected %d", name, value, exp)
}
}
func checkF64(t *testing.T, name string, value, exp, tol float64) {
if value < exp-tol || value > exp+tol {
t.Errorf("Incorrect value for %s: got %f, expected %f", name, value, exp)
}
}
func TestServerBasket(t *testing.T) {
var s serverBasket
s.init(2)
// add some requests with different request value factors
s.updateRvFactor(1)
noexp := utils.ExpirationFactor{Factor: 1}
s.add(0, 1000, 10000, noexp)
s.add(1, 3000, 60000, noexp)
s.updateRvFactor(10)
s.add(0, 4000, 4000, noexp)
s.add(1, 2000, 4000, noexp)
s.updateRvFactor(10)
// check basket contents directly
checkU64(t, "s.basket[0].amount", s.basket.items[0].amount, 5000*basketFactor)
checkU64(t, "s.basket[0].value", s.basket.items[0].value, 50000)
checkU64(t, "s.basket[1].amount", s.basket.items[1].amount, 5000*basketFactor)
checkU64(t, "s.basket[1].value", s.basket.items[1].value, 100000)
// transfer 50% of the contents of the basket
transfer1 := s.transfer(0.5)
checkU64(t, "transfer1[0].amount", transfer1.items[0].amount, 2500*basketFactor)
checkU64(t, "transfer1[0].value", transfer1.items[0].value, 25000)
checkU64(t, "transfer1[1].amount", transfer1.items[1].amount, 2500*basketFactor)
checkU64(t, "transfer1[1].value", transfer1.items[1].value, 50000)
// add more requests
s.updateRvFactor(100)
s.add(0, 1000, 100, noexp)
// transfer 25% of the contents of the basket
transfer2 := s.transfer(0.25)
checkU64(t, "transfer2[0].amount", transfer2.items[0].amount, (2500+1000)/4*basketFactor)
checkU64(t, "transfer2[0].value", transfer2.items[0].value, (25000+10000)/4)
checkU64(t, "transfer2[1].amount", transfer2.items[1].amount, 2500/4*basketFactor)
checkU64(t, "transfer2[1].value", transfer2.items[1].value, 50000/4)
}
func TestConvertMapping(t *testing.T) {
b := requestBasket{items: []basketItem{{3, 3}, {1, 1}, {2, 2}}}
oldMap := []string{"req3", "req1", "req2"}
newMap := []string{"req1", "req2", "req3", "req4"}
init := requestBasket{items: []basketItem{{2, 2}, {4, 4}, {6, 6}, {8, 8}}}
bc := b.convertMapping(oldMap, newMap, init)
checkU64(t, "bc[0].amount", bc.items[0].amount, 1)
checkU64(t, "bc[1].amount", bc.items[1].amount, 2)
checkU64(t, "bc[2].amount", bc.items[2].amount, 3)
checkU64(t, "bc[3].amount", bc.items[3].amount, 4) // 8 should be scaled down to 4
}
func TestReqValueFactor(t *testing.T) {
var ref referenceBasket
ref.basket = requestBasket{items: make([]basketItem, 4)}
for i := range ref.basket.items {
ref.basket.items[i].amount = uint64(i+1) * basketFactor
ref.basket.items[i].value = uint64(i+1) * basketFactor
}
ref.init(4)
rvf := ref.reqValueFactor([]uint64{1000, 2000, 3000, 4000})
// expected value is (1000000+2000000+3000000+4000000) / (1*1000+2*2000+3*3000+4*4000) = 10000000/30000 = 333.333
checkF64(t, "reqValueFactor", rvf, 333.333, 1)
}
func TestNormalize(t *testing.T) {
for cycle := 0; cycle < 100; cycle += 1 {
// Initialize data for testing
valueRange, lower := 1000000, 1000000
ref := referenceBasket{basket: requestBasket{items: make([]basketItem, 10)}}
for i := 0; i < 10; i++ {
ref.basket.items[i].amount = uint64(rand.Intn(valueRange) + lower)
ref.basket.items[i].value = uint64(rand.Intn(valueRange) + lower)
}
ref.normalize()
// Check whether SUM(amount) ~= SUM(value)
var sumAmount, sumValue uint64
for i := 0; i < 10; i++ {
sumAmount += ref.basket.items[i].amount
sumValue += ref.basket.items[i].value
}
var epsilon = 0.01
if float64(sumAmount)*(1+epsilon) < float64(sumValue) || float64(sumAmount)*(1-epsilon) > float64(sumValue) {
t.Fatalf("Failed to normalize sumAmount: %d sumValue: %d", sumAmount, sumValue)
}
}
}
func TestReqValueAdjustment(t *testing.T) {
var s1, s2 serverBasket
s1.init(3)
s2.init(3)
cost1 := []uint64{30000, 60000, 90000}
cost2 := []uint64{100000, 200000, 300000}
var ref referenceBasket
ref.basket = requestBasket{items: make([]basketItem, 3)}
for i := range ref.basket.items {
ref.basket.items[i].amount = 123 * basketFactor
ref.basket.items[i].value = 123 * basketFactor
}
ref.init(3)
// initial reqValues are expected to be {1, 1, 1}
checkF64(t, "reqValues[0]", ref.reqValues[0], 1, 0.01)
checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01)
checkF64(t, "reqValues[2]", ref.reqValues[2], 1, 0.01)
var logOffset utils.Fixed64
for period := 0; period < 1000; period++ {
exp := utils.ExpFactor(logOffset)
s1.updateRvFactor(ref.reqValueFactor(cost1))
s2.updateRvFactor(ref.reqValueFactor(cost2))
// throw in random requests into each basket using their internal pricing
for i := 0; i < 1000; i++ {
reqType, reqAmount := uint32(rand.Intn(3)), uint32(rand.Intn(10)+1)
reqCost := uint64(reqAmount) * cost1[reqType]
s1.add(reqType, reqAmount, reqCost, exp)
reqType, reqAmount = uint32(rand.Intn(3)), uint32(rand.Intn(10)+1)
reqCost = uint64(reqAmount) * cost2[reqType]
s2.add(reqType, reqAmount, reqCost, exp)
}
ref.add(s1.transfer(0.1))
ref.add(s2.transfer(0.1))
ref.normalize()
ref.updateReqValues()
logOffset += utils.Float64ToFixed64(0.1)
}
checkF64(t, "reqValues[0]", ref.reqValues[0], 0.5, 0.01)
checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01)
checkF64(t, "reqValues[2]", ref.reqValues[2], 1.5, 0.01)
}

View File

@ -0,0 +1,237 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"io"
"math"
"time"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/rlp"
)
const (
minResponseTime = time.Millisecond * 50
maxResponseTime = time.Second * 10
timeStatLength = 32
weightScaleFactor = 1000000
)
// ResponseTimeStats is the response time distribution of a set of answered requests,
// weighted with request value, either served by a single server or aggregated for
// multiple servers.
// It it a fixed length (timeStatLength) distribution vector with linear interpolation.
// The X axis (the time values) are not linear, they should be transformed with
// TimeToStatScale and StatScaleToTime.
type (
ResponseTimeStats struct {
stats [timeStatLength]uint64
exp uint64
}
ResponseTimeWeights [timeStatLength]float64
)
var timeStatsLogFactor = (timeStatLength - 1) / (math.Log(float64(maxResponseTime)/float64(minResponseTime)) + 1)
// TimeToStatScale converts a response time to a distribution vector index. The index
// is represented by a float64 so that linear interpolation can be applied.
func TimeToStatScale(d time.Duration) float64 {
if d < 0 {
return 0
}
r := float64(d) / float64(minResponseTime)
if r > 1 {
r = math.Log(r) + 1
}
r *= timeStatsLogFactor
if r > timeStatLength-1 {
return timeStatLength - 1
}
return r
}
// StatScaleToTime converts a distribution vector index to a response time. The index
// is represented by a float64 so that linear interpolation can be applied.
func StatScaleToTime(r float64) time.Duration {
r /= timeStatsLogFactor
if r > 1 {
r = math.Exp(r - 1)
}
return time.Duration(r * float64(minResponseTime))
}
// TimeoutWeights calculates the weight function used for calculating service value
// based on the response time distribution of the received service.
// It is based on the request timeout value of the system. It consists of a half cosine
// function starting with 1, crossing zero at timeout and reaching -1 at 2*timeout.
// After 2*timeout the weight is constant -1.
func TimeoutWeights(timeout time.Duration) (res ResponseTimeWeights) {
for i := range res {
t := StatScaleToTime(float64(i))
if t < 2*timeout {
res[i] = math.Cos(math.Pi / 2 * float64(t) / float64(timeout))
} else {
res[i] = -1
}
}
return
}
// EncodeRLP implements rlp.Encoder
func (rt *ResponseTimeStats) EncodeRLP(w io.Writer) error {
enc := struct {
Stats [timeStatLength]uint64
Exp uint64
}{rt.stats, rt.exp}
return rlp.Encode(w, &enc)
}
// DecodeRLP implements rlp.Decoder
func (rt *ResponseTimeStats) DecodeRLP(s *rlp.Stream) error {
var enc struct {
Stats [timeStatLength]uint64
Exp uint64
}
if err := s.Decode(&enc); err != nil {
return err
}
rt.stats, rt.exp = enc.Stats, enc.Exp
return nil
}
// Add adds a new response time with the given weight to the distribution.
func (rt *ResponseTimeStats) Add(respTime time.Duration, weight float64, expFactor utils.ExpirationFactor) {
rt.setExp(expFactor.Exp)
weight *= expFactor.Factor * weightScaleFactor
r := TimeToStatScale(respTime)
i := int(r)
r -= float64(i)
rt.stats[i] += uint64(weight * (1 - r))
if i < timeStatLength-1 {
rt.stats[i+1] += uint64(weight * r)
}
}
// setExp sets the power of 2 exponent of the structure, scaling base values (the vector
// itself) up or down if necessary.
func (rt *ResponseTimeStats) setExp(exp uint64) {
if exp > rt.exp {
shift := exp - rt.exp
for i, v := range rt.stats {
rt.stats[i] = v >> shift
}
rt.exp = exp
}
if exp < rt.exp {
shift := rt.exp - exp
for i, v := range rt.stats {
rt.stats[i] = v << shift
}
rt.exp = exp
}
}
// Value calculates the total service value based on the given distribution, using the
// specified weight function.
func (rt ResponseTimeStats) Value(weights ResponseTimeWeights, expFactor utils.ExpirationFactor) float64 {
var v float64
for i, s := range rt.stats {
v += float64(s) * weights[i]
}
if v < 0 {
return 0
}
return expFactor.Value(v, rt.exp) / weightScaleFactor
}
// AddStats adds the given ResponseTimeStats to the current one.
func (rt *ResponseTimeStats) AddStats(s *ResponseTimeStats) {
rt.setExp(s.exp)
for i, v := range s.stats {
rt.stats[i] += v
}
}
// SubStats subtracts the given ResponseTimeStats from the current one.
func (rt *ResponseTimeStats) SubStats(s *ResponseTimeStats) {
rt.setExp(s.exp)
for i, v := range s.stats {
if v < rt.stats[i] {
rt.stats[i] -= v
} else {
rt.stats[i] = 0
}
}
}
// Timeout suggests a timeout value based on the previous distribution. The parameter
// is the desired rate of timeouts assuming a similar distribution in the future.
// Note that the actual timeout should have a sensible minimum bound so that operating
// under ideal working conditions for a long time (for example, using a local server
// with very low response times) will not make it very hard for the system to accommodate
// longer response times in the future.
func (rt ResponseTimeStats) Timeout(failRatio float64) time.Duration {
var sum uint64
for _, v := range rt.stats {
sum += v
}
s := uint64(float64(sum) * failRatio)
i := timeStatLength - 1
for i > 0 && s >= rt.stats[i] {
s -= rt.stats[i]
i--
}
r := float64(i) + 0.5
if rt.stats[i] > 0 {
r -= float64(s) / float64(rt.stats[i])
}
if r < 0 {
r = 0
}
th := StatScaleToTime(r)
if th > maxResponseTime {
th = maxResponseTime
}
return th
}
// RtDistribution represents a distribution as a series of (X, Y) chart coordinates,
// where the X axis is the response time in seconds while the Y axis is the amount of
// service value received with a response time close to the X coordinate.
type RtDistribution [timeStatLength][2]float64
// Distribution returns a RtDistribution, optionally normalized to a sum of 1.
func (rt ResponseTimeStats) Distribution(normalized bool, expFactor utils.ExpirationFactor) (res RtDistribution) {
var mul float64
if normalized {
var sum uint64
for _, v := range rt.stats {
sum += v
}
if sum > 0 {
mul = 1 / float64(sum)
}
} else {
mul = expFactor.Value(float64(1)/weightScaleFactor, rt.exp)
}
for i, v := range rt.stats {
res[i][0] = float64(StatScaleToTime(float64(i))) / float64(time.Second)
res[i][1] = float64(v) * mul
}
return
}

View File

@ -0,0 +1,137 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"math"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/les/utils"
)
func TestTransition(t *testing.T) {
var epsilon = 0.01
var cases = []time.Duration{
time.Millisecond, minResponseTime,
time.Second, time.Second * 5, maxResponseTime,
}
for _, c := range cases {
got := StatScaleToTime(TimeToStatScale(c))
if float64(got)*(1+epsilon) < float64(c) || float64(got)*(1-epsilon) > float64(c) {
t.Fatalf("Failed to transition back")
}
}
// If the time is too large(exceeds the max response time.
got := StatScaleToTime(TimeToStatScale(2 * maxResponseTime))
if float64(got)*(1+epsilon) < float64(maxResponseTime) || float64(got)*(1-epsilon) > float64(maxResponseTime) {
t.Fatalf("Failed to transition back")
}
}
var maxResponseWeights = TimeoutWeights(maxResponseTime)
func TestValue(t *testing.T) {
noexp := utils.ExpirationFactor{Factor: 1}
for i := 0; i < 1000; i++ {
max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime)))
min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime)))
timeout := max/2 + time.Duration(rand.Int63n(int64(maxResponseTime-max/2)))
s := makeRangeStats(min, max, 1000, noexp)
value := s.Value(TimeoutWeights(timeout), noexp)
// calculate the average weight (the average of the given range of the half cosine
// weight function).
minx := math.Pi / 2 * float64(min) / float64(timeout)
maxx := math.Pi / 2 * float64(max) / float64(timeout)
avgWeight := (math.Sin(maxx) - math.Sin(minx)) / (maxx - minx)
expv := 1000 * avgWeight
if expv < 0 {
expv = 0
}
if value < expv-10 || value > expv+10 {
t.Errorf("Value failed (expected %v, got %v)", expv, value)
}
}
}
func TestAddSubExpire(t *testing.T) {
var (
sum1, sum2 ResponseTimeStats
sum1ValueExp, sum2ValueExp float64
logOffset utils.Fixed64
)
for i := 0; i < 1000; i++ {
exp := utils.ExpFactor(logOffset)
max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime)))
min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime)))
s := makeRangeStats(min, max, 1000, exp)
value := s.Value(maxResponseWeights, exp)
sum1.AddStats(&s)
sum1ValueExp += value
if rand.Intn(2) == 1 {
sum2.AddStats(&s)
sum2ValueExp += value
}
logOffset += utils.Float64ToFixed64(0.001 / math.Log(2))
sum1ValueExp -= sum1ValueExp * 0.001
sum2ValueExp -= sum2ValueExp * 0.001
}
exp := utils.ExpFactor(logOffset)
sum1Value := sum1.Value(maxResponseWeights, exp)
if sum1Value < sum1ValueExp*0.99 || sum1Value > sum1ValueExp*1.01 {
t.Errorf("sum1Value failed (expected %v, got %v)", sum1ValueExp, sum1Value)
}
sum2Value := sum2.Value(maxResponseWeights, exp)
if sum2Value < sum2ValueExp*0.99 || sum2Value > sum2ValueExp*1.01 {
t.Errorf("sum2Value failed (expected %v, got %v)", sum2ValueExp, sum2Value)
}
diff := sum1
diff.SubStats(&sum2)
diffValue := diff.Value(maxResponseWeights, exp)
diffValueExp := sum1ValueExp - sum2ValueExp
if diffValue < diffValueExp*0.99 || diffValue > diffValueExp*1.01 {
t.Errorf("diffValue failed (expected %v, got %v)", diffValueExp, diffValue)
}
}
func TestTimeout(t *testing.T) {
testTimeoutRange(t, 0, time.Second)
testTimeoutRange(t, time.Second, time.Second*2)
testTimeoutRange(t, time.Second, maxResponseTime)
}
func testTimeoutRange(t *testing.T, min, max time.Duration) {
s := makeRangeStats(min, max, 1000, utils.ExpirationFactor{Factor: 1})
for i := 2; i < 9; i++ {
to := s.Timeout(float64(i) / 10)
exp := max - (max-min)*time.Duration(i)/10
tol := (max - min) / 50
if to < exp-tol || to > exp+tol {
t.Errorf("Timeout failed (expected %v, got %v)", exp, to)
}
}
}
func makeRangeStats(min, max time.Duration, amount float64, exp utils.ExpirationFactor) ResponseTimeStats {
var s ResponseTimeStats
amount /= 1000
for i := 0; i < 1000; i++ {
s.Add(min+(max-min)*time.Duration(i)/999, amount, exp)
}
return s
}

View File

@ -0,0 +1,515 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"bytes"
"fmt"
"math"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
const (
vtVersion = 1 // database encoding format for ValueTracker
nvtVersion = 1 // database encoding format for NodeValueTracker
)
var (
vtKey = []byte("vt:")
vtNodeKey = []byte("vtNode:")
)
// NodeValueTracker collects service value statistics for a specific server node
type NodeValueTracker struct {
lock sync.Mutex
rtStats, lastRtStats ResponseTimeStats
lastTransfer mclock.AbsTime
basket serverBasket
reqCosts []uint64
reqValues *[]float64
}
// init initializes a NodeValueTracker.
// Note that the contents of the referenced reqValues slice will not change; a new
// reference is passed if the values are updated by ValueTracker.
func (nv *NodeValueTracker) init(now mclock.AbsTime, reqValues *[]float64) {
reqTypeCount := len(*reqValues)
nv.reqCosts = make([]uint64, reqTypeCount)
nv.lastTransfer = now
nv.reqValues = reqValues
nv.basket.init(reqTypeCount)
}
// updateCosts updates the request cost table of the server. The request value factor
// is also updated based on the given cost table and the current reference basket.
// Note that the contents of the referenced reqValues slice will not change; a new
// reference is passed if the values are updated by ValueTracker.
func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
nv.lock.Lock()
defer nv.lock.Unlock()
nv.reqCosts = reqCosts
nv.reqValues = reqValues
nv.basket.updateRvFactor(rvFactor)
}
// transferStats returns request basket and response time statistics that should be
// added to the global statistics. The contents of the server's own request basket are
// gradually transferred to the main reference basket and removed from the server basket
// with the specified transfer rate.
// The response time statistics are retained at both places and therefore the global
// distribution is always the sum of the individual server distributions.
func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) {
nv.lock.Lock()
defer nv.lock.Unlock()
dt := now - nv.lastTransfer
nv.lastTransfer = now
if dt < 0 {
dt = 0
}
recentRtStats := nv.rtStats
recentRtStats.SubStats(&nv.lastRtStats)
nv.lastRtStats = nv.rtStats
return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats
}
// RtStats returns the node's own response time distribution statistics
func (nv *NodeValueTracker) RtStats() ResponseTimeStats {
nv.lock.Lock()
defer nv.lock.Unlock()
return nv.rtStats
}
// ValueTracker coordinates service value calculation for individual servers and updates
// global statistics
type ValueTracker struct {
clock mclock.Clock
lock sync.Mutex
quit chan chan struct{}
db ethdb.KeyValueStore
connected map[enode.ID]*NodeValueTracker
reqTypeCount int
refBasket referenceBasket
mappings [][]string
currentMapping int
initRefBasket requestBasket
rtStats ResponseTimeStats
transferRate float64
statsExpLock sync.RWMutex
statsExpRate, offlineExpRate float64
statsExpirer utils.Expirer
statsExpFactor utils.ExpirationFactor
}
type valueTrackerEncV1 struct {
Mappings [][]string
RefBasketMapping uint
RefBasket requestBasket
RtStats ResponseTimeStats
ExpOffset, SavedAt uint64
}
type nodeValueTrackerEncV1 struct {
RtStats ResponseTimeStats
ServerBasketMapping uint
ServerBasket requestBasket
}
// RequestInfo is an initializer structure for the service vector.
type RequestInfo struct {
// Name identifies the request type and is used for re-mapping the service vector if necessary
Name string
// InitAmount and InitValue are used to initialize the reference basket
InitAmount, InitValue float64
}
// NewValueTracker creates a new ValueTracker and loads its previously saved state from
// the database if possible.
func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker {
now := clock.Now()
initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))}
mapping := make([]string, len(reqInfo))
var sumAmount, sumValue float64
for _, req := range reqInfo {
sumAmount += req.InitAmount
sumValue += req.InitAmount * req.InitValue
}
scaleValues := sumAmount * basketFactor / sumValue
for i, req := range reqInfo {
mapping[i] = req.Name
initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor)
initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues)
}
vt := &ValueTracker{
clock: clock,
connected: make(map[enode.ID]*NodeValueTracker),
quit: make(chan chan struct{}),
db: db,
reqTypeCount: len(initRefBasket.items),
initRefBasket: initRefBasket,
transferRate: transferRate,
statsExpRate: statsExpRate,
offlineExpRate: offlineExpRate,
}
if vt.loadFromDb(mapping) != nil {
// previous state not saved or invalid, init with default values
vt.refBasket.basket = initRefBasket
vt.mappings = [][]string{mapping}
vt.currentMapping = 0
}
vt.statsExpirer.SetRate(now, statsExpRate)
vt.refBasket.init(vt.reqTypeCount)
vt.periodicUpdate()
go func() {
for {
select {
case <-clock.After(updatePeriod):
vt.lock.Lock()
vt.periodicUpdate()
vt.lock.Unlock()
case quit := <-vt.quit:
close(quit)
return
}
}
}()
return vt
}
// StatsExpirer returns the statistics expirer so that other values can be expired
// with the same rate as the service value statistics.
func (vt *ValueTracker) StatsExpirer() *utils.Expirer {
return &vt.statsExpirer
}
// loadFromDb loads the value tracker's state from the database and converts saved
// request basket index mapping if it does not match the specified index to name mapping.
func (vt *ValueTracker) loadFromDb(mapping []string) error {
enc, err := vt.db.Get(vtKey)
if err != nil {
return err
}
r := bytes.NewReader(enc)
var version uint
if err := rlp.Decode(r, &version); err != nil {
log.Error("Decoding value tracker state failed", "err", err)
return err
}
if version != vtVersion {
log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion)
return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion)
}
var vte valueTrackerEncV1
if err := rlp.Decode(r, &vte); err != nil {
log.Error("Decoding value tracker state failed", "err", err)
return err
}
logOffset := utils.Fixed64(vte.ExpOffset)
dt := time.Now().UnixNano() - int64(vte.SavedAt)
if dt > 0 {
logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2))
}
vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset)
vt.rtStats = vte.RtStats
vt.mappings = vte.Mappings
vt.currentMapping = -1
loop:
for i, m := range vt.mappings {
if len(m) != len(mapping) {
continue loop
}
for j, s := range mapping {
if m[j] != s {
continue loop
}
}
vt.currentMapping = i
break
}
if vt.currentMapping == -1 {
vt.currentMapping = len(vt.mappings)
vt.mappings = append(vt.mappings, mapping)
}
if int(vte.RefBasketMapping) == vt.currentMapping {
vt.refBasket.basket = vte.RefBasket
} else {
if vte.RefBasketMapping >= uint(len(vt.mappings)) {
log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping)
return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping)
}
vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket)
}
return nil
}
// saveToDb saves the value tracker's state to the database
func (vt *ValueTracker) saveToDb() {
vte := valueTrackerEncV1{
Mappings: vt.mappings,
RefBasketMapping: uint(vt.currentMapping),
RefBasket: vt.refBasket.basket,
RtStats: vt.rtStats,
ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())),
SavedAt: uint64(time.Now().UnixNano()),
}
enc1, err := rlp.EncodeToBytes(uint(vtVersion))
if err != nil {
log.Error("Encoding value tracker state failed", "err", err)
return
}
enc2, err := rlp.EncodeToBytes(&vte)
if err != nil {
log.Error("Encoding value tracker state failed", "err", err)
return
}
if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil {
log.Error("Saving value tracker state failed", "err", err)
}
}
// Stop saves the value tracker's state and each loaded node's individual state and
// returns after shutting the internal goroutines down.
func (vt *ValueTracker) Stop() {
quit := make(chan struct{})
vt.quit <- quit
<-quit
vt.lock.Lock()
vt.periodicUpdate()
for id, nv := range vt.connected {
vt.saveNode(id, nv)
}
vt.connected = nil
vt.saveToDb()
vt.lock.Unlock()
}
// Register adds a server node to the value tracker
func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
vt.lock.Lock()
defer vt.lock.Unlock()
if vt.connected == nil {
// ValueTracker has already been stopped
return nil
}
nv := vt.loadOrNewNode(id)
nv.init(vt.clock.Now(), &vt.refBasket.reqValues)
vt.connected[id] = nv
return nv
}
// Unregister removes a server node from the value tracker
func (vt *ValueTracker) Unregister(id enode.ID) {
vt.lock.Lock()
defer vt.lock.Unlock()
if nv := vt.connected[id]; nv != nil {
vt.saveNode(id, nv)
delete(vt.connected, id)
}
}
// GetNode returns an individual server node's value tracker. If it did not exist before
// then a new node is created.
func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker {
vt.lock.Lock()
defer vt.lock.Unlock()
return vt.loadOrNewNode(id)
}
// loadOrNewNode returns an individual server node's value tracker. If it did not exist before
// then a new node is created.
func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker {
if nv, ok := vt.connected[id]; ok {
return nv
}
nv := &NodeValueTracker{lastTransfer: vt.clock.Now()}
enc, err := vt.db.Get(append(vtNodeKey, id[:]...))
if err != nil {
return nv
}
r := bytes.NewReader(enc)
var version uint
if err := rlp.Decode(r, &version); err != nil {
log.Error("Failed to decode node value tracker", "id", id, "err", err)
return nv
}
if version != nvtVersion {
log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion)
return nv
}
var nve nodeValueTrackerEncV1
if err := rlp.Decode(r, &nve); err != nil {
log.Error("Failed to decode node value tracker", "id", id, "err", err)
return nv
}
nv.rtStats = nve.RtStats
nv.lastRtStats = nve.RtStats
if int(nve.ServerBasketMapping) == vt.currentMapping {
nv.basket.basket = nve.ServerBasket
} else {
if nve.ServerBasketMapping >= uint(len(vt.mappings)) {
log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping)
return nv
}
nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket)
}
return nv
}
// saveNode saves a server node's value tracker to the database
func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) {
recentRtStats := nv.rtStats
recentRtStats.SubStats(&nv.lastRtStats)
vt.rtStats.AddStats(&recentRtStats)
nv.lastRtStats = nv.rtStats
nve := nodeValueTrackerEncV1{
RtStats: nv.rtStats,
ServerBasketMapping: uint(vt.currentMapping),
ServerBasket: nv.basket.basket,
}
enc1, err := rlp.EncodeToBytes(uint(nvtVersion))
if err != nil {
log.Error("Failed to encode service value information", "id", id, "err", err)
return
}
enc2, err := rlp.EncodeToBytes(&nve)
if err != nil {
log.Error("Failed to encode service value information", "id", id, "err", err)
return
}
if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil {
log.Error("Failed to save service value information", "id", id, "err", err)
}
}
// UpdateCosts updates the node value tracker's request cost table
func (vt *ValueTracker) UpdateCosts(nv *NodeValueTracker, reqCosts []uint64) {
vt.lock.Lock()
defer vt.lock.Unlock()
nv.updateCosts(reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(reqCosts))
}
// RtStats returns the global response time distribution statistics
func (vt *ValueTracker) RtStats() ResponseTimeStats {
vt.lock.Lock()
defer vt.lock.Unlock()
vt.periodicUpdate()
return vt.rtStats
}
// periodicUpdate transfers individual node data to the global statistics, normalizes
// the reference basket and updates request values. The global state is also saved to
// the database with each update.
func (vt *ValueTracker) periodicUpdate() {
now := vt.clock.Now()
vt.statsExpLock.Lock()
vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now))
vt.statsExpLock.Unlock()
for _, nv := range vt.connected {
basket, rtStats := nv.transferStats(now, vt.transferRate)
vt.refBasket.add(basket)
vt.rtStats.AddStats(&rtStats)
}
vt.refBasket.normalize()
vt.refBasket.updateReqValues()
for _, nv := range vt.connected {
nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
}
vt.saveToDb()
}
type ServedRequest struct {
ReqType, Amount uint32
}
// Served adds a served request to the node's statistics. An actual request may be composed
// of one or more request types (service vector indices).
func (vt *ValueTracker) Served(nv *NodeValueTracker, reqs []ServedRequest, respTime time.Duration) {
vt.statsExpLock.RLock()
expFactor := vt.statsExpFactor
vt.statsExpLock.RUnlock()
nv.lock.Lock()
defer nv.lock.Unlock()
var value float64
for _, r := range reqs {
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
}
nv.rtStats.Add(respTime, value, vt.statsExpFactor)
}
type RequestStatsItem struct {
Name string
ReqAmount, ReqValue float64
}
// RequestStats returns the current contents of the reference request basket, with
// request values meaning average per request rather than total.
func (vt *ValueTracker) RequestStats() []RequestStatsItem {
vt.statsExpLock.RLock()
expFactor := vt.statsExpFactor
vt.statsExpLock.RUnlock()
vt.lock.Lock()
defer vt.lock.Unlock()
vt.periodicUpdate()
res := make([]RequestStatsItem, len(vt.refBasket.basket.items))
for i, item := range vt.refBasket.basket.items {
res[i].Name = vt.mappings[vt.currentMapping][i]
res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp)
res[i].ReqValue = vt.refBasket.reqValues[i]
}
return res
}
// TotalServiceValue returns the total service value provided by the given node (as
// a function of the weights which are calculated from the request timeout value).
func (vt *ValueTracker) TotalServiceValue(nv *NodeValueTracker, weights ResponseTimeWeights) float64 {
vt.statsExpLock.RLock()
expFactor := vt.statsExpFactor
vt.statsExpLock.RUnlock()
nv.lock.Lock()
defer nv.lock.Unlock()
return nv.rtStats.Value(weights, expFactor)
}

View File

@ -0,0 +1,135 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"math"
"math/rand"
"strconv"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/les/utils"
)
const (
testReqTypes = 3
testNodeCount = 5
testReqCount = 10000
testRounds = 10
)
func TestValueTracker(t *testing.T) {
db := memorydb.New()
clock := &mclock.Simulated{}
requestList := make([]RequestInfo, testReqTypes)
relPrices := make([]float64, testReqTypes)
totalAmount := make([]uint64, testReqTypes)
for i := range requestList {
requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
totalAmount[i] = 1
relPrices[i] = rand.Float64() + 0.1
}
nodes := make([]*NodeValueTracker, testNodeCount)
for round := 0; round < testRounds; round++ {
makeRequests := round < testRounds-2
useExpiration := round == testRounds-1
var expRate float64
if useExpiration {
expRate = math.Log(2) / float64(time.Hour*100)
}
vt := NewValueTracker(db, clock, requestList, time.Minute, 1/float64(time.Hour), expRate, expRate)
updateCosts := func(i int) {
costList := make([]uint64, testReqTypes)
baseCost := rand.Float64()*10000000 + 100000
for j := range costList {
costList[j] = uint64(baseCost * relPrices[j])
}
vt.UpdateCosts(nodes[i], costList)
}
for i := range nodes {
nodes[i] = vt.Register(enode.ID{byte(i)})
updateCosts(i)
}
if makeRequests {
for i := 0; i < testReqCount; i++ {
reqType := rand.Intn(testReqTypes)
reqAmount := rand.Intn(10) + 1
node := rand.Intn(testNodeCount)
respTime := time.Duration((rand.Float64() + 1) * float64(time.Second) * float64(node+1) / testNodeCount)
totalAmount[reqType] += uint64(reqAmount)
vt.Served(nodes[node], []ServedRequest{{uint32(reqType), uint32(reqAmount)}}, respTime)
clock.Run(time.Second)
}
} else {
clock.Run(time.Hour * 100)
if useExpiration {
for i, a := range totalAmount {
totalAmount[i] = a / 2
}
}
}
vt.Stop()
var sumrp, sumrv float64
for i, rp := range relPrices {
sumrp += rp
sumrv += vt.refBasket.reqValues[i]
}
for i, rp := range relPrices {
ratio := vt.refBasket.reqValues[i] * sumrp / (rp * sumrv)
if ratio < 0.99 || ratio > 1.01 {
t.Errorf("reqValues (%v) does not match relPrices (%v)", vt.refBasket.reqValues, relPrices)
break
}
}
exp := utils.ExpFactor(vt.StatsExpirer().LogOffset(clock.Now()))
basketAmount := make([]uint64, testReqTypes)
for i, bi := range vt.refBasket.basket.items {
basketAmount[i] += uint64(exp.Value(float64(bi.amount), vt.refBasket.basket.exp))
}
if makeRequests {
// if we did not make requests in this round then we expect all amounts to be
// in the reference basket
for _, node := range nodes {
for i, bi := range node.basket.basket.items {
basketAmount[i] += uint64(exp.Value(float64(bi.amount), node.basket.basket.exp))
}
}
}
for i, a := range basketAmount {
amount := a / basketFactor
if amount+10 < totalAmount[i] || amount > totalAmount[i]+10 {
t.Errorf("totalAmount[%d] mismatch in round %d (expected %d, got %d)", i, round, totalAmount[i], amount)
}
}
var sumValue float64
for _, node := range nodes {
s := node.RtStats()
sumValue += s.Value(maxResponseWeights, exp)
}
s := vt.RtStats()
mainValue := s.Value(maxResponseWeights, exp)
if sumValue < mainValue-10 || sumValue > mainValue+10 {
t.Errorf("Main rtStats value does not match sum of node rtStats values in round %d (main %v, sum %v)", round, mainValue, sumValue)
}
}
}

View File

@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les/flowcontrol"
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
@ -358,6 +359,10 @@ type serverPeer struct {
poolEntry *poolEntry // Statistic for server peer.
fcServer *flowcontrol.ServerNode // Client side mirror token bucket.
vtLock sync.Mutex
valueTracker *lpc.ValueTracker
nodeValueTracker *lpc.NodeValueTracker
sentReqs map[uint64]sentReqEntry
// Statistics
errCount int // Counter the invalid responses server has replied
@ -428,62 +433,71 @@ func sendRequest(w p2p.MsgWriter, msgcode, reqID uint64, data interface{}) error
return p2p.Send(w, msgcode, req{reqID, data})
}
func (p *serverPeer) sendRequest(msgcode, reqID uint64, data interface{}, amount int) error {
p.sentRequest(reqID, uint32(msgcode), uint32(amount))
return sendRequest(p.rw, msgcode, reqID, data)
}
// requestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *serverPeer) requestHeadersByHash(reqID uint64, origin common.Hash, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount)
}
// requestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *serverPeer) requestHeadersByNumber(reqID, origin uint64, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount)
}
// requestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
func (p *serverPeer) requestBodies(reqID uint64, hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
return sendRequest(p.rw, GetBlockBodiesMsg, reqID, hashes)
return p.sendRequest(GetBlockBodiesMsg, reqID, hashes, len(hashes))
}
// requestCode fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
func (p *serverPeer) requestCode(reqID uint64, reqs []CodeReq) error {
p.Log().Debug("Fetching batch of codes", "count", len(reqs))
return sendRequest(p.rw, GetCodeMsg, reqID, reqs)
return p.sendRequest(GetCodeMsg, reqID, reqs, len(reqs))
}
// requestReceipts fetches a batch of transaction receipts from a remote node.
func (p *serverPeer) requestReceipts(reqID uint64, hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
return sendRequest(p.rw, GetReceiptsMsg, reqID, hashes)
return p.sendRequest(GetReceiptsMsg, reqID, hashes, len(hashes))
}
// requestProofs fetches a batch of merkle proofs from a remote node.
func (p *serverPeer) requestProofs(reqID uint64, reqs []ProofReq) error {
p.Log().Debug("Fetching batch of proofs", "count", len(reqs))
return sendRequest(p.rw, GetProofsV2Msg, reqID, reqs)
return p.sendRequest(GetProofsV2Msg, reqID, reqs, len(reqs))
}
// requestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
func (p *serverPeer) requestHelperTrieProofs(reqID uint64, reqs []HelperTrieReq) error {
p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, reqs)
return p.sendRequest(GetHelperTrieProofsMsg, reqID, reqs, len(reqs))
}
// requestTxStatus fetches a batch of transaction status records from a remote node.
func (p *serverPeer) requestTxStatus(reqID uint64, txHashes []common.Hash) error {
p.Log().Debug("Requesting transaction status", "count", len(txHashes))
return sendRequest(p.rw, GetTxStatusMsg, reqID, txHashes)
return p.sendRequest(GetTxStatusMsg, reqID, txHashes, len(txHashes))
}
// SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool.
func (p *serverPeer) sendTxs(reqID uint64, txs rlp.RawValue) error {
p.Log().Debug("Sending batch of transactions", "size", len(txs))
return sendRequest(p.rw, SendTxV2Msg, reqID, txs)
func (p *serverPeer) sendTxs(reqID uint64, amount int, txs rlp.RawValue) error {
p.Log().Debug("Sending batch of transactions", "amount", amount, "size", len(txs))
sizeFactor := (len(txs) + txSizeCostLimit/2) / txSizeCostLimit
if sizeFactor > amount {
amount = sizeFactor
}
return p.sendRequest(SendTxV2Msg, reqID, txs, amount)
}
// waitBefore implements distPeer interface
@ -532,6 +546,7 @@ func (p *serverPeer) getTxRelayCost(amount, size int) uint64 {
// HasBlock checks if the peer has a given block
func (p *serverPeer) HasBlock(hash common.Hash, number uint64, hasState bool) bool {
p.lock.RLock()
head := p.headInfo.Number
var since, recent uint64
if hasState {
@ -630,6 +645,87 @@ func (p *serverPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge
})
}
// setValueTracker sets the value tracker references for connected servers. Note that the
// references should be removed upon disconnection by setValueTracker(nil, nil).
func (p *serverPeer) setValueTracker(vt *lpc.ValueTracker, nvt *lpc.NodeValueTracker) {
p.vtLock.Lock()
p.valueTracker = vt
p.nodeValueTracker = nvt
if nvt != nil {
p.sentReqs = make(map[uint64]sentReqEntry)
} else {
p.sentReqs = nil
}
p.vtLock.Unlock()
}
// updateVtParams updates the server's price table in the value tracker.
func (p *serverPeer) updateVtParams() {
p.vtLock.Lock()
defer p.vtLock.Unlock()
if p.nodeValueTracker == nil {
return
}
reqCosts := make([]uint64, len(requestList))
for code, costs := range p.fcCosts {
if m, ok := requestMapping[uint32(code)]; ok {
reqCosts[m.first] = costs.baseCost + costs.reqCost
if m.rest != -1 {
reqCosts[m.rest] = costs.reqCost
}
}
}
p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts)
}
// sentReqEntry remembers sent requests and their sending times
type sentReqEntry struct {
reqType, amount uint32
at mclock.AbsTime
}
// sentRequest marks a request sent at the current moment to this server.
func (p *serverPeer) sentRequest(id uint64, reqType, amount uint32) {
p.vtLock.Lock()
if p.sentReqs != nil {
p.sentReqs[id] = sentReqEntry{reqType, amount, mclock.Now()}
}
p.vtLock.Unlock()
}
// answeredRequest marks a request answered at the current moment by this server.
func (p *serverPeer) answeredRequest(id uint64) {
p.vtLock.Lock()
if p.sentReqs == nil {
p.vtLock.Unlock()
return
}
e, ok := p.sentReqs[id]
delete(p.sentReqs, id)
vt := p.valueTracker
nvt := p.nodeValueTracker
p.vtLock.Unlock()
if !ok {
return
}
var (
vtReqs [2]lpc.ServedRequest
reqCount int
)
m := requestMapping[e.reqType]
if m.rest == -1 || e.amount <= 1 {
reqCount = 1
vtReqs[0] = lpc.ServedRequest{ReqType: uint32(m.first), Amount: e.amount}
} else {
reqCount = 2
vtReqs[0] = lpc.ServedRequest{ReqType: uint32(m.first), Amount: 1}
vtReqs[1] = lpc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1}
}
dt := time.Duration(mclock.Now() - e.at)
vt.Served(nvt, vtReqs[:reqCount], dt)
}
// clientPeer represents each node to which the les server is connected.
// The node here refers to the light client.
type clientPeer struct {

View File

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
@ -79,17 +80,57 @@ const (
type requestInfo struct {
name string
maxCount uint64
refBasketFirst, refBasketRest float64
}
var requests = map[uint64]requestInfo{
GetBlockHeadersMsg: {"GetBlockHeaders", MaxHeaderFetch},
GetBlockBodiesMsg: {"GetBlockBodies", MaxBodyFetch},
GetReceiptsMsg: {"GetReceipts", MaxReceiptFetch},
GetCodeMsg: {"GetCode", MaxCodeFetch},
GetProofsV2Msg: {"GetProofsV2", MaxProofsFetch},
GetHelperTrieProofsMsg: {"GetHelperTrieProofs", MaxHelperTrieProofsFetch},
SendTxV2Msg: {"SendTxV2", MaxTxSend},
GetTxStatusMsg: {"GetTxStatus", MaxTxStatus},
// reqMapping maps an LES request to one or two lespay service vector entries.
// If rest != -1 and the request type is used with amounts larger than one then the
// first one of the multi-request is mapped to first while the rest is mapped to rest.
type reqMapping struct {
first, rest int
}
var (
// requests describes the available LES request types and their initializing amounts
// in the lespay/client.ValueTracker reference basket. Initial values are estimates
// based on the same values as the server's default cost estimates (reqAvgTimeCost).
requests = map[uint64]requestInfo{
GetBlockHeadersMsg: {"GetBlockHeaders", MaxHeaderFetch, 10, 1000},
GetBlockBodiesMsg: {"GetBlockBodies", MaxBodyFetch, 1, 0},
GetReceiptsMsg: {"GetReceipts", MaxReceiptFetch, 1, 0},
GetCodeMsg: {"GetCode", MaxCodeFetch, 1, 0},
GetProofsV2Msg: {"GetProofsV2", MaxProofsFetch, 10, 0},
GetHelperTrieProofsMsg: {"GetHelperTrieProofs", MaxHelperTrieProofsFetch, 10, 100},
SendTxV2Msg: {"SendTxV2", MaxTxSend, 1, 0},
GetTxStatusMsg: {"GetTxStatus", MaxTxStatus, 10, 0},
}
requestList []lpc.RequestInfo
requestMapping map[uint32]reqMapping
)
// init creates a request list and mapping between protocol message codes and lespay
// service vector indices.
func init() {
requestMapping = make(map[uint32]reqMapping)
for code, req := range requests {
cost := reqAvgTimeCost[code]
rm := reqMapping{len(requestList), -1}
requestList = append(requestList, lpc.RequestInfo{
Name: req.name + ".first",
InitAmount: req.refBasketFirst,
InitValue: float64(cost.baseCost + cost.reqCost),
})
if req.refBasketRest != 0 {
rm.rest = len(requestList)
requestList = append(requestList, lpc.RequestInfo{
Name: req.name + ".rest",
InitAmount: req.refBasketRest,
InitValue: float64(cost.reqCost),
})
}
requestMapping[uint32(code)] = rm
}
}
type errCode int

View File

@ -144,7 +144,7 @@ func (ltrx *lesTxRelay) send(txs types.Transactions, count int) {
peer := dp.(*serverPeer)
cost := peer.getTxRelayCost(len(ll), len(enc))
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.sendTxs(reqID, enc) }
return func() { peer.sendTxs(reqID, len(ll), enc) }
},
}
go ltrx.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, ltrx.stop)

202
les/utils/expiredvalue.go Normal file
View File

@ -0,0 +1,202 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"math"
"github.com/ethereum/go-ethereum/common/mclock"
)
// ExpiredValue is a scalar value that is continuously expired (decreased
// exponentially) based on the provided logarithmic expiration offset value.
//
// The formula for value calculation is: base*2^(exp-logOffset). In order to
// simplify the calculation of ExpiredValue, its value is expressed in the form
// of an exponent with a base of 2.
//
// Also here is a trick to reduce a lot of calculations. In theory, when a value X
// decays over time and then a new value Y is added, the final result should be
// X*2^(exp-logOffset)+Y. However it's very hard to represent in memory.
// So the trick is using the idea of inflation instead of exponential decay. At this
// moment the temporary value becomes: X*2^exp+Y*2^logOffset_1, apply the exponential
// decay when we actually want to calculate the value.
//
// e.g.
// t0: V = 100
// t1: add 30, inflationary value is: 100 + 30/0.3, 0.3 is the decay coefficient
// t2: get value, decay coefficient is 0.2 now, final result is: 200*0.2 = 40
type ExpiredValue struct {
Base, Exp uint64 // rlp encoding works by default
}
// ExpirationFactor is calculated from logOffset. 1 <= Factor < 2 and Factor*2^Exp
// describes the multiplier applicable for additions and the divider for readouts.
// If logOffset changes slowly then it saves some expensive operations to not calculate
// them for each addition and readout but cache this intermediate form for some time.
// It is also useful for structures where multiple values are expired with the same
// Expirer.
type ExpirationFactor struct {
Exp uint64
Factor float64
}
// ExpFactor calculates ExpirationFactor based on logOffset
func ExpFactor(logOffset Fixed64) ExpirationFactor {
return ExpirationFactor{Exp: logOffset.ToUint64(), Factor: logOffset.Fraction().Pow2()}
}
// Value calculates the expired value based on a floating point base and integer
// power-of-2 exponent. This function should be used by multi-value expired structures.
func (e ExpirationFactor) Value(base float64, exp uint64) float64 {
res := base / e.Factor
if exp > e.Exp {
res *= float64(uint64(1) << (exp - e.Exp))
}
if exp < e.Exp {
res /= float64(uint64(1) << (e.Exp - exp))
}
return res
}
// value calculates the value at the given moment.
func (e ExpiredValue) Value(logOffset Fixed64) uint64 {
offset := Uint64ToFixed64(e.Exp) - logOffset
return uint64(float64(e.Base) * offset.Pow2())
}
// add adds a signed value at the given moment
func (e *ExpiredValue) Add(amount int64, logOffset Fixed64) int64 {
integer, frac := logOffset.ToUint64(), logOffset.Fraction()
factor := frac.Pow2()
base := factor * float64(amount)
if integer < e.Exp {
base /= math.Pow(2, float64(e.Exp-integer))
}
if integer > e.Exp {
e.Base >>= (integer - e.Exp)
e.Exp = integer
}
if base >= 0 || uint64(-base) <= e.Base {
e.Base += uint64(base)
return amount
}
net := int64(-float64(e.Base) / factor)
e.Base = 0
return net
}
// addExp adds another ExpiredValue
func (e *ExpiredValue) AddExp(a ExpiredValue) {
if e.Exp > a.Exp {
a.Base >>= (e.Exp - a.Exp)
}
if e.Exp < a.Exp {
e.Base >>= (a.Exp - e.Exp)
e.Exp = a.Exp
}
e.Base += a.Base
}
// subExp subtracts another ExpiredValue
func (e *ExpiredValue) SubExp(a ExpiredValue) {
if e.Exp > a.Exp {
a.Base >>= (e.Exp - a.Exp)
}
if e.Exp < a.Exp {
e.Base >>= (a.Exp - e.Exp)
e.Exp = a.Exp
}
if e.Base > a.Base {
e.Base -= a.Base
} else {
e.Base = 0
}
}
// Expirer changes logOffset with a linear rate which can be changed during operation.
// It is not thread safe, if access by multiple goroutines is needed then it should be
// encapsulated into a locked structure.
// Note that if neither SetRate nor SetLogOffset are used during operation then LogOffset
// is thread safe.
type Expirer struct {
logOffset Fixed64
rate float64
lastUpdate mclock.AbsTime
}
// SetRate changes the expiration rate which is the inverse of the time constant in
// nanoseconds.
func (e *Expirer) SetRate(now mclock.AbsTime, rate float64) {
dt := now - e.lastUpdate
if dt > 0 {
e.logOffset += Fixed64(logToFixedFactor * float64(dt) * e.rate)
}
e.lastUpdate = now
e.rate = rate
}
// SetLogOffset sets logOffset instantly.
func (e *Expirer) SetLogOffset(now mclock.AbsTime, logOffset Fixed64) {
e.lastUpdate = now
e.logOffset = logOffset
}
// LogOffset returns the current logarithmic offset.
func (e *Expirer) LogOffset(now mclock.AbsTime) Fixed64 {
dt := now - e.lastUpdate
if dt <= 0 {
return e.logOffset
}
return e.logOffset + Fixed64(logToFixedFactor*float64(dt)*e.rate)
}
// fixedFactor is the fixed point multiplier factor used by Fixed64.
const fixedFactor = 0x1000000
// Fixed64 implements 64-bit fixed point arithmetic functions.
type Fixed64 int64
// Uint64ToFixed64 converts uint64 integer to Fixed64 format.
func Uint64ToFixed64(f uint64) Fixed64 {
return Fixed64(f * fixedFactor)
}
// float64ToFixed64 converts float64 to Fixed64 format.
func Float64ToFixed64(f float64) Fixed64 {
return Fixed64(f * fixedFactor)
}
// toUint64 converts Fixed64 format to uint64.
func (f64 Fixed64) ToUint64() uint64 {
return uint64(f64) / fixedFactor
}
// fraction returns the fractional part of a Fixed64 value.
func (f64 Fixed64) Fraction() Fixed64 {
return f64 % fixedFactor
}
var (
logToFixedFactor = float64(fixedFactor) / math.Log(2)
fixedToLogFactor = math.Log(2) / float64(fixedFactor)
)
// pow2Fixed returns the base 2 power of the fixed point value.
func (f64 Fixed64) Pow2() float64 {
return math.Exp(float64(f64) * fixedToLogFactor)
}

View File

@ -0,0 +1,116 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package utils
import "testing"
func TestValueExpiration(t *testing.T) {
var cases = []struct {
input ExpiredValue
timeOffset Fixed64
expect uint64
}{
{ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 128},
{ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 64},
{ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(2), 32},
{ExpiredValue{Base: 128, Exp: 2}, Uint64ToFixed64(2), 128},
{ExpiredValue{Base: 128, Exp: 2}, Uint64ToFixed64(3), 64},
}
for _, c := range cases {
if got := c.input.Value(c.timeOffset); got != c.expect {
t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got)
}
}
}
func TestValueAddition(t *testing.T) {
var cases = []struct {
input ExpiredValue
addend int64
timeOffset Fixed64
expect uint64
expectNet int64
}{
// Addition
{ExpiredValue{Base: 128, Exp: 0}, 128, Uint64ToFixed64(0), 256, 128},
{ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(0), 640, 128},
// Addition with offset
{ExpiredValue{Base: 128, Exp: 0}, 128, Uint64ToFixed64(1), 192, 128},
{ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(1), 384, 128},
{ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(3), 192, 128},
// Subtraction
{ExpiredValue{Base: 128, Exp: 0}, -64, Uint64ToFixed64(0), 64, -64},
{ExpiredValue{Base: 128, Exp: 0}, -128, Uint64ToFixed64(0), 0, -128},
{ExpiredValue{Base: 128, Exp: 0}, -192, Uint64ToFixed64(0), 0, -128},
// Subtraction with offset
{ExpiredValue{Base: 128, Exp: 0}, -64, Uint64ToFixed64(1), 0, -64},
{ExpiredValue{Base: 128, Exp: 0}, -128, Uint64ToFixed64(1), 0, -64},
{ExpiredValue{Base: 128, Exp: 2}, -128, Uint64ToFixed64(1), 128, -128},
{ExpiredValue{Base: 128, Exp: 2}, -128, Uint64ToFixed64(2), 0, -128},
}
for _, c := range cases {
if net := c.input.Add(c.addend, c.timeOffset); net != c.expectNet {
t.Fatalf("Net amount mismatch, want=%d, got=%d", c.expectNet, net)
}
if got := c.input.Value(c.timeOffset); got != c.expect {
t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got)
}
}
}
func TestExpiredValueAddition(t *testing.T) {
var cases = []struct {
input ExpiredValue
another ExpiredValue
timeOffset Fixed64
expect uint64
}{
{ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 256},
{ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 384},
{ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 1}, Uint64ToFixed64(0), 384},
{ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 128},
}
for _, c := range cases {
c.input.AddExp(c.another)
if got := c.input.Value(c.timeOffset); got != c.expect {
t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got)
}
}
}
func TestExpiredValueSubtraction(t *testing.T) {
var cases = []struct {
input ExpiredValue
another ExpiredValue
timeOffset Fixed64
expect uint64
}{
{ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 0},
{ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 1}, Uint64ToFixed64(0), 0},
{ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 128},
{ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 64},
}
for _, c := range cases {
c.input.SubExp(c.another)
if got := c.input.Value(c.timeOffset); got != c.expect {
t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got)
}
}
}

View File

@ -217,7 +217,7 @@ func (n ID) MarshalText() ([]byte, error) {
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (n *ID) UnmarshalText(text []byte) error {
id, err := parseID(string(text))
id, err := ParseID(string(text))
if err != nil {
return err
}
@ -229,14 +229,14 @@ func (n *ID) UnmarshalText(text []byte) error {
// The string may be prefixed with 0x.
// It panics if the string is not a valid ID.
func HexID(in string) ID {
id, err := parseID(in)
id, err := ParseID(in)
if err != nil {
panic(err)
}
return id
}
func parseID(in string) (ID, error) {
func ParseID(in string) (ID, error) {
var id ID
b, err := hex.DecodeString(strings.TrimPrefix(in, "0x"))
if err != nil {