les: multiple server bugfixes (#20079)
* les: detailed relative cost metrics * les: filter txpool relative request statistic * les: initialize price factors * les: increased connected bias to lower churn rate * les: fixed clientPool.setLimits * core: do not use mutex in GetAncestor * les: bump factor db version again * les: add metrics * les, light: minor fixes
This commit is contained in:
parent
d4dce43bff
commit
0ac9bbba6c
@ -2151,9 +2151,6 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
|
|||||||
//
|
//
|
||||||
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
|
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
|
||||||
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
|
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
|
||||||
bc.chainmu.RLock()
|
|
||||||
defer bc.chainmu.RUnlock()
|
|
||||||
|
|
||||||
return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
|
return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,9 +348,12 @@ func (hc *HeaderChain) GetAncestor(hash common.Hash, number, ancestor uint64, ma
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for ancestor != 0 {
|
for ancestor != 0 {
|
||||||
|
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
|
||||||
|
ancestorHash := rawdb.ReadCanonicalHash(hc.chainDb, number-ancestor)
|
||||||
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
|
if rawdb.ReadCanonicalHash(hc.chainDb, number) == hash {
|
||||||
number -= ancestor
|
number -= ancestor
|
||||||
return rawdb.ReadCanonicalHash(hc.chainDb, number), number
|
return ancestorHash, number
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if *maxNonCanonical == 0 {
|
if *maxNonCanonical == 0 {
|
||||||
return common.Hash{}, 0
|
return common.Hash{}, 0
|
||||||
|
@ -33,7 +33,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
negBalanceExpTC = time.Hour // time constant for exponentially reducing negative balance
|
negBalanceExpTC = time.Hour // time constant for exponentially reducing negative balance
|
||||||
fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
|
fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format
|
||||||
connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
|
connectedBias = time.Minute * 5 // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon
|
||||||
lazyQueueRefresh = time.Second * 10 // refresh period of the connected queue
|
lazyQueueRefresh = time.Second * 10 // refresh period of the connected queue
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -366,12 +366,14 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {
|
|||||||
|
|
||||||
f.countLimit = count
|
f.countLimit = count
|
||||||
f.capacityLimit = totalCap
|
f.capacityLimit = totalCap
|
||||||
|
if f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit {
|
||||||
now := mclock.Now()
|
now := mclock.Now()
|
||||||
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
|
f.connectedQueue.MultiPop(func(data interface{}, priority int64) bool {
|
||||||
c := data.(*clientInfo)
|
c := data.(*clientInfo)
|
||||||
f.dropClient(c, now, true)
|
f.dropClient(c, now, true)
|
||||||
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
|
return f.connectedCapacity > f.capacityLimit || f.connectedQueue.Size() > f.countLimit
|
||||||
})
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestCost feeds request cost after serving a request from the given peer.
|
// requestCost feeds request cost after serving a request from the given peer.
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/les/flowcontrol"
|
"github.com/ethereum/go-ethereum/les/flowcontrol"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const makeCostStats = false // make request cost statistics during operation
|
const makeCostStats = false // make request cost statistics during operation
|
||||||
@ -87,7 +88,7 @@ const (
|
|||||||
gfUsageTC = time.Second
|
gfUsageTC = time.Second
|
||||||
gfRaiseTC = time.Second * 200
|
gfRaiseTC = time.Second * 200
|
||||||
gfDropTC = time.Second * 50
|
gfDropTC = time.Second * 50
|
||||||
gfDbKey = "_globalCostFactorV3"
|
gfDbKey = "_globalCostFactorV6"
|
||||||
)
|
)
|
||||||
|
|
||||||
// costTracker is responsible for calculating costs and cost estimates on the
|
// costTracker is responsible for calculating costs and cost estimates on the
|
||||||
@ -226,6 +227,9 @@ type reqInfo struct {
|
|||||||
// servingTime is the CPU time corresponding to the actual processing of
|
// servingTime is the CPU time corresponding to the actual processing of
|
||||||
// the request.
|
// the request.
|
||||||
servingTime float64
|
servingTime float64
|
||||||
|
|
||||||
|
// msgCode indicates the type of request.
|
||||||
|
msgCode uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// gfLoop starts an event loop which updates the global cost factor which is
|
// gfLoop starts an event loop which updates the global cost factor which is
|
||||||
@ -269,11 +273,43 @@ func (ct *costTracker) gfLoop() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case r := <-ct.reqInfoCh:
|
case r := <-ct.reqInfoCh:
|
||||||
|
relCost := int64(factor * r.servingTime * 100 / r.avgTimeCost) // Convert the value to a percentage form
|
||||||
|
|
||||||
|
// Record more metrics if we are debugging
|
||||||
|
if metrics.EnabledExpensive {
|
||||||
|
switch r.msgCode {
|
||||||
|
case GetBlockHeadersMsg:
|
||||||
|
relativeCostHeaderHistogram.Update(relCost)
|
||||||
|
case GetBlockBodiesMsg:
|
||||||
|
relativeCostBodyHistogram.Update(relCost)
|
||||||
|
case GetReceiptsMsg:
|
||||||
|
relativeCostReceiptHistogram.Update(relCost)
|
||||||
|
case GetCodeMsg:
|
||||||
|
relativeCostCodeHistogram.Update(relCost)
|
||||||
|
case GetProofsV2Msg:
|
||||||
|
relativeCostProofHistogram.Update(relCost)
|
||||||
|
case GetHelperTrieProofsMsg:
|
||||||
|
relativeCostHelperProofHistogram.Update(relCost)
|
||||||
|
case SendTxV2Msg:
|
||||||
|
relativeCostSendTxHistogram.Update(relCost)
|
||||||
|
case GetTxStatusMsg:
|
||||||
|
relativeCostTxStatusHistogram.Update(relCost)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// SendTxV2 and GetTxStatus requests are two special cases.
|
||||||
|
// All other requests will only put pressure on the database, and
|
||||||
|
// the corresponding delay is relatively stable. While these two
|
||||||
|
// requests involve txpool query, which is usually unstable.
|
||||||
|
//
|
||||||
|
// TODO(rjl493456442) fixes this.
|
||||||
|
if r.msgCode == SendTxV2Msg || r.msgCode == GetTxStatusMsg {
|
||||||
|
continue
|
||||||
|
}
|
||||||
requestServedMeter.Mark(int64(r.servingTime))
|
requestServedMeter.Mark(int64(r.servingTime))
|
||||||
requestServedTimer.Update(time.Duration(r.servingTime))
|
requestServedTimer.Update(time.Duration(r.servingTime))
|
||||||
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
|
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
|
||||||
requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
|
requestEstimatedTimer.Update(time.Duration(r.avgTimeCost / factor))
|
||||||
relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
|
relativeCostHistogram.Update(relCost)
|
||||||
|
|
||||||
now := mclock.Now()
|
now := mclock.Now()
|
||||||
dt := float64(now - expUpdate)
|
dt := float64(now - expUpdate)
|
||||||
@ -324,6 +360,7 @@ func (ct *costTracker) gfLoop() {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
globalFactorGauge.Update(int64(1000 * factor))
|
||||||
log.Debug("global cost factor updated", "factor", factor)
|
log.Debug("global cost factor updated", "factor", factor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -375,7 +412,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
|
|||||||
avg := reqAvgTimeCost[code]
|
avg := reqAvgTimeCost[code]
|
||||||
avgTimeCost := avg.baseCost + amount*avg.reqCost
|
avgTimeCost := avg.baseCost + amount*avg.reqCost
|
||||||
select {
|
select {
|
||||||
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
|
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime), code}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if makeCostStats {
|
if makeCostStats {
|
||||||
|
@ -60,6 +60,15 @@ var (
|
|||||||
miscOutTxStatusPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets/txStatus", nil)
|
miscOutTxStatusPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets/txStatus", nil)
|
||||||
miscOutTxStatusTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic/txStatus", nil)
|
miscOutTxStatusTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic/txStatus", nil)
|
||||||
|
|
||||||
|
miscServingTimeHeaderTimer = metrics.NewRegisteredTimer("les/misc/serve/header", nil)
|
||||||
|
miscServingTimeBodyTimer = metrics.NewRegisteredTimer("les/misc/serve/body", nil)
|
||||||
|
miscServingTimeCodeTimer = metrics.NewRegisteredTimer("les/misc/serve/code", nil)
|
||||||
|
miscServingTimeReceiptTimer = metrics.NewRegisteredTimer("les/misc/serve/receipt", nil)
|
||||||
|
miscServingTimeTrieProofTimer = metrics.NewRegisteredTimer("les/misc/serve/proof", nil)
|
||||||
|
miscServingTimeHelperTrieTimer = metrics.NewRegisteredTimer("les/misc/serve/helperTrie", nil)
|
||||||
|
miscServingTimeTxTimer = metrics.NewRegisteredTimer("les/misc/serve/txs", nil)
|
||||||
|
miscServingTimeTxStatusTimer = metrics.NewRegisteredTimer("les/misc/serve/txStatus", nil)
|
||||||
|
|
||||||
connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil)
|
connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil)
|
||||||
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
|
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
|
||||||
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
|
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
|
||||||
@ -74,7 +83,16 @@ var (
|
|||||||
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
|
requestEstimatedMeter = metrics.NewRegisteredMeter("les/server/req/avgEstimatedTime", nil)
|
||||||
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
|
requestEstimatedTimer = metrics.NewRegisteredTimer("les/server/req/estimatedTime", nil)
|
||||||
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
|
relativeCostHistogram = metrics.NewRegisteredHistogram("les/server/req/relative", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostHeaderHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/header", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostBodyHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/body", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostReceiptHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/receipt", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostCodeHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/code", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/proof", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostHelperProofHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/helperTrie", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostSendTxHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/txs", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
relativeCostTxStatusHistogram = metrics.NewRegisteredHistogram("les/server/req/relative/txStatus", nil, metrics.NewExpDecaySample(1028, 0.015))
|
||||||
|
|
||||||
|
globalFactorGauge = metrics.NewRegisteredGauge("les/server/globalFactor", nil)
|
||||||
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
|
recentServedGauge = metrics.NewRegisteredGauge("les/server/recentRequestServed", nil)
|
||||||
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
|
recentEstimatedGauge = metrics.NewRegisteredGauge("les/server/recentRequestEstimated", nil)
|
||||||
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
|
sqServedGauge = metrics.NewRegisteredGauge("les/server/servingQueue/served", nil)
|
||||||
|
@ -113,6 +113,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
|
|||||||
}
|
}
|
||||||
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
|
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)
|
||||||
srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
|
srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
|
||||||
|
srv.clientPool.setPriceFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1})
|
||||||
|
|
||||||
checkpoint := srv.latestLocalCheckpoint()
|
checkpoint := srv.latestLocalCheckpoint()
|
||||||
if !checkpoint.Empty() {
|
if !checkpoint.Empty() {
|
||||||
|
@ -268,6 +268,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInHeaderPacketsMeter.Mark(1)
|
miscInHeaderPacketsMeter.Mark(1)
|
||||||
miscInHeaderTrafficMeter.Mark(int64(msg.Size))
|
miscInHeaderTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeHeaderTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -380,6 +381,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInBodyPacketsMeter.Mark(1)
|
miscInBodyPacketsMeter.Mark(1)
|
||||||
miscInBodyTrafficMeter.Mark(int64(msg.Size))
|
miscInBodyTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeBodyTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -428,6 +430,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInCodePacketsMeter.Mark(1)
|
miscInCodePacketsMeter.Mark(1)
|
||||||
miscInCodeTrafficMeter.Mark(int64(msg.Size))
|
miscInCodeTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeCodeTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -499,6 +502,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInReceiptPacketsMeter.Mark(1)
|
miscInReceiptPacketsMeter.Mark(1)
|
||||||
miscInReceiptTrafficMeter.Mark(int64(msg.Size))
|
miscInReceiptTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeReceiptTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -555,6 +559,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInTrieProofPacketsMeter.Mark(1)
|
miscInTrieProofPacketsMeter.Mark(1)
|
||||||
miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
|
miscInTrieProofTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeTrieProofTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -657,6 +662,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInHelperTriePacketsMeter.Mark(1)
|
miscInHelperTriePacketsMeter.Mark(1)
|
||||||
miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
|
miscInHelperTrieTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeHelperTrieTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -731,6 +737,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInTxsPacketsMeter.Mark(1)
|
miscInTxsPacketsMeter.Mark(1)
|
||||||
miscInTxsTrafficMeter.Mark(int64(msg.Size))
|
miscInTxsTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeTxTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
@ -779,6 +786,7 @@ func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
|
|||||||
if metrics.EnabledExpensive {
|
if metrics.EnabledExpensive {
|
||||||
miscInTxStatusPacketsMeter.Mark(1)
|
miscInTxStatusPacketsMeter.Mark(1)
|
||||||
miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
|
miscInTxStatusTrafficMeter.Mark(int64(msg.Size))
|
||||||
|
defer func(start time.Time) { miscServingTimeTxStatusTimer.UpdateSince(start) }(time.Now())
|
||||||
}
|
}
|
||||||
var req struct {
|
var req struct {
|
||||||
ReqID uint64
|
ReqID uint64
|
||||||
|
@ -438,9 +438,6 @@ func (lc *LightChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
|
|||||||
//
|
//
|
||||||
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
|
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
|
||||||
func (lc *LightChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
|
func (lc *LightChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
|
||||||
lc.chainmu.RLock()
|
|
||||||
defer lc.chainmu.RUnlock()
|
|
||||||
|
|
||||||
return lc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
|
return lc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user