les: polish code (#22625)

* les: polish code

* les/vflus/server: fixes

* les: fix lint
This commit is contained in:
gary rong 2021-04-27 15:44:59 +08:00 committed by GitHub
parent 9b99e3dfe0
commit 854f068ed6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 63 deletions

View File

@ -71,7 +71,6 @@ var (
connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil) connectionTimer = metrics.NewRegisteredTimer("les/connection/duration", nil)
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil) serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil) totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil) totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)

View File

@ -1099,7 +1099,6 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge
// set default announceType on server side // set default announceType on server side
p.announceType = announceTypeSimple p.announceType = announceTypeSimple
} }
p.fcClient = flowcontrol.NewClientNode(server.fcManager, p.fcParams)
} }
return nil return nil
}) })

View File

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
@ -122,26 +123,27 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err) p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err return err
} }
// Connected to another server, no messages expected, just wait for disconnection
if p.server { if p.server {
if err := h.server.serverset.register(p); err != nil { if err := h.server.serverset.register(p); err != nil {
return err return err
} }
// connected to another server, no messages expected, just wait for disconnection
_, err := p.rw.ReadMsg() _, err := p.rw.ReadMsg()
h.server.serverset.unregister(p) h.server.serverset.unregister(p)
return err return err
} }
defer p.fcClient.Disconnect() // set by handshake if it's not another server // Setup flow control mechanism for the peer
p.fcClient = flowcontrol.NewClientNode(h.server.fcManager, p.fcParams)
defer p.fcClient.Disconnect()
// Reject light clients if server is not synced. // Reject light clients if server is not synced. Put this checking here, so
// // that "non-synced" les-server peers are still allowed to keep the connection.
// Put this checking here, so that "non-synced" les-server peers are still allowed
// to keep the connection.
if !h.synced() { if !h.synced() {
p.Log().Debug("Light server not synced, rejecting peer") p.Log().Debug("Light server not synced, rejecting peer")
return p2p.DiscRequested return p2p.DiscRequested
} }
// Register the peer into the peerset and clientpool
if err := h.server.peers.register(p); err != nil { if err := h.server.peers.register(p); err != nil {
return err return err
} }
@ -150,19 +152,14 @@ func (h *serverHandler) handle(p *clientPeer) error {
p.Log().Debug("Client pool already closed") p.Log().Debug("Client pool already closed")
return p2p.DiscRequested return p2p.DiscRequested
} }
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
p.connectedAt = mclock.Now() p.connectedAt = mclock.Now()
var wg sync.WaitGroup // Wait group used to track all in-flight task routines. var wg sync.WaitGroup // Wait group used to track all in-flight task routines.
defer func() { defer func() {
wg.Wait() // Ensure all background task routines have exited. wg.Wait() // Ensure all background task routines have exited.
h.server.clientPool.Unregister(p) h.server.clientPool.Unregister(p)
h.server.peers.unregister(p.ID()) h.server.peers.unregister(p.ID())
p.balance = nil p.balance = nil
activeCount, _ := h.server.clientPool.Active()
clientConnectionGauge.Update(int64(activeCount))
connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt)) connectionTimer.Update(time.Duration(mclock.Now() - p.connectedAt))
}() }()

View File

@ -63,20 +63,22 @@ type priorityPool struct {
ns *nodestate.NodeStateMachine ns *nodestate.NodeStateMachine
clock mclock.Clock clock mclock.Clock
lock sync.Mutex lock sync.Mutex
inactiveQueue *prque.Prque
maxCount, maxCap uint64 maxCount, maxCap uint64
minCap uint64 minCap uint64
activeBias time.Duration activeBias time.Duration
capacityStepDiv, fineStepDiv uint64 capacityStepDiv, fineStepDiv uint64
// The snapshot of priority pool for query.
cachedCurve *capacityCurve cachedCurve *capacityCurve
ccUpdatedAt mclock.AbsTime ccUpdatedAt mclock.AbsTime
ccUpdateForced bool ccUpdateForced bool
tempState []*ppNodeInfo // nodes currently in temporary state // Runtime status of prioritypool, represents the
// the following fields represent the temporary state if tempState is not empty // temporary state if tempState is not empty
tempState []*ppNodeInfo
activeCount, activeCap uint64 activeCount, activeCap uint64
activeQueue *prque.LazyQueue activeQueue *prque.LazyQueue
inactiveQueue *prque.Prque
} }
// ppNodeInfo is the internal node descriptor of priorityPool // ppNodeInfo is the internal node descriptor of priorityPool
@ -89,8 +91,9 @@ type ppNodeInfo struct {
tempState bool // should only be true while the priorityPool lock is held tempState bool // should only be true while the priorityPool lock is held
tempCapacity uint64 // equals capacity when tempState is false tempCapacity uint64 // equals capacity when tempState is false
// the following fields only affect the temporary state and they are set to their // the following fields only affect the temporary state and they are set to their
// default value when entering the temp state // default value when leaving the temp state
minTarget, stepDiv uint64 minTarget, stepDiv uint64
bias time.Duration bias time.Duration
} }
@ -157,11 +160,6 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m
func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 { func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget uint64, bias time.Duration) uint64 {
pp.lock.Lock() pp.lock.Lock()
pp.activeQueue.Refresh() pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()
if minTarget < pp.minCap { if minTarget < pp.minCap {
minTarget = pp.minCap minTarget = pp.minCap
@ -175,12 +173,13 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo) c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
if c == nil { if c == nil {
log.Error("requestCapacity called for unknown node", "id", node.ID()) log.Error("requestCapacity called for unknown node", "id", node.ID())
pp.lock.Unlock()
return 0 return 0
} }
pp.setTempState(c) pp.setTempState(c)
if maxTarget > c.capacity { if maxTarget > c.capacity {
c.bias = bias pp.setTempStepDiv(c, pp.fineStepDiv)
c.stepDiv = pp.fineStepDiv pp.setTempBias(c, bias)
} }
pp.setTempCapacity(c, maxTarget) pp.setTempCapacity(c, maxTarget)
c.minTarget = minTarget c.minTarget = minTarget
@ -188,7 +187,9 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
pp.inactiveQueue.Remove(c.inactiveIndex) pp.inactiveQueue.Remove(c.inactiveIndex)
pp.activeQueue.Push(c) pp.activeQueue.Push(c)
pp.enforceLimits() pp.enforceLimits()
updates = pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity) updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
pp.lock.Unlock()
pp.updateFlags(updates)
return c.capacity return c.capacity
} }
@ -196,15 +197,11 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) { func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
pp.lock.Lock() pp.lock.Lock()
pp.activeQueue.Refresh() pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.ns.Operation(func() { pp.updateFlags(updates) })
}()
inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap) inc := (maxCount > pp.maxCount) || (maxCap > pp.maxCap)
dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap) dec := (maxCount < pp.maxCount) || (maxCap < pp.maxCap)
pp.maxCount, pp.maxCap = maxCount, maxCap pp.maxCount, pp.maxCap = maxCount, maxCap
var updates []capUpdate
if dec { if dec {
pp.enforceLimits() pp.enforceLimits()
updates = pp.finalizeChanges(true) updates = pp.finalizeChanges(true)
@ -212,6 +209,8 @@ func (pp *priorityPool) SetLimits(maxCount, maxCap uint64) {
if inc { if inc {
updates = append(updates, pp.tryActivate(false)...) updates = append(updates, pp.tryActivate(false)...)
} }
pp.lock.Unlock()
pp.ns.Operation(func() { pp.updateFlags(updates) })
} }
// setActiveBias sets the bias applied when trying to activate inactive nodes // setActiveBias sets the bias applied when trying to activate inactive nodes
@ -291,18 +290,15 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
func (pp *priorityPool) connectedNode(c *ppNodeInfo) { func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
pp.lock.Lock() pp.lock.Lock()
pp.activeQueue.Refresh() pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()
if c.connected { if c.connected {
pp.lock.Unlock()
return return
} }
c.connected = true c.connected = true
pp.inactiveQueue.Push(c, pp.inactivePriority(c)) pp.inactiveQueue.Push(c, pp.inactivePriority(c))
updates = pp.tryActivate(false) updates := pp.tryActivate(false)
pp.lock.Unlock()
pp.updateFlags(updates)
} }
// disconnectedNode is called when a node has been removed from the pool (both inactiveFlag // disconnectedNode is called when a node has been removed from the pool (both inactiveFlag
@ -311,23 +307,22 @@ func (pp *priorityPool) connectedNode(c *ppNodeInfo) {
func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) { func (pp *priorityPool) disconnectedNode(c *ppNodeInfo) {
pp.lock.Lock() pp.lock.Lock()
pp.activeQueue.Refresh() pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()
if !c.connected { if !c.connected {
pp.lock.Unlock()
return return
} }
c.connected = false c.connected = false
pp.activeQueue.Remove(c.activeIndex) pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex) pp.inactiveQueue.Remove(c.inactiveIndex)
var updates []capUpdate
if c.capacity != 0 { if c.capacity != 0 {
pp.setTempState(c) pp.setTempState(c)
pp.setTempCapacity(c, 0) pp.setTempCapacity(c, 0)
updates = pp.tryActivate(true) updates = pp.tryActivate(true)
} }
pp.lock.Unlock()
pp.updateFlags(updates)
} }
// setTempState internally puts a node in a temporary state that can either be reverted // setTempState internally puts a node in a temporary state that can either be reverted
@ -342,27 +337,62 @@ func (pp *priorityPool) setTempState(c *ppNodeInfo) {
if c.tempCapacity != c.capacity { // should never happen if c.tempCapacity != c.capacity { // should never happen
log.Error("tempCapacity != capacity when entering tempState") log.Error("tempCapacity != capacity when entering tempState")
} }
// Assign all the defaults to the temp state.
c.minTarget = pp.minCap c.minTarget = pp.minCap
c.stepDiv = pp.capacityStepDiv c.stepDiv = pp.capacityStepDiv
c.bias = 0
pp.tempState = append(pp.tempState, c) pp.tempState = append(pp.tempState, c)
} }
// unsetTempState revokes the temp status of the node and reset all internal
// fields to the default value.
func (pp *priorityPool) unsetTempState(c *ppNodeInfo) {
if !c.tempState {
return
}
c.tempState = false
if c.tempCapacity != c.capacity { // should never happen
log.Error("tempCapacity != capacity when leaving tempState")
}
c.minTarget = pp.minCap
c.stepDiv = pp.capacityStepDiv
c.bias = 0
}
// setTempCapacity changes the capacity of a node in the temporary state and adjusts // setTempCapacity changes the capacity of a node in the temporary state and adjusts
// activeCap and activeCount accordingly. Since this change is performed in the temporary // activeCap and activeCount accordingly. Since this change is performed in the temporary
// state it should be called after setTempState and before finalizeChanges. // state it should be called after setTempState and before finalizeChanges.
func (pp *priorityPool) setTempCapacity(n *ppNodeInfo, cap uint64) { func (pp *priorityPool) setTempCapacity(c *ppNodeInfo, cap uint64) {
if !n.tempState { // should never happen if !c.tempState { // should never happen
log.Error("Node is not in temporary state") log.Error("Node is not in temporary state")
return return
} }
pp.activeCap += cap - n.tempCapacity pp.activeCap += cap - c.tempCapacity
if n.tempCapacity == 0 { if c.tempCapacity == 0 {
pp.activeCount++ pp.activeCount++
} }
if cap == 0 { if cap == 0 {
pp.activeCount-- pp.activeCount--
} }
n.tempCapacity = cap c.tempCapacity = cap
}
// setTempBias changes the connection bias of a node in the temporary state.
func (pp *priorityPool) setTempBias(c *ppNodeInfo, bias time.Duration) {
if !c.tempState { // should never happen
log.Error("Node is not in temporary state")
return
}
c.bias = bias
}
// setTempStepDiv changes the capacity divisor of a node in the temporary state.
func (pp *priorityPool) setTempStepDiv(c *ppNodeInfo, stepDiv uint64) {
if !c.tempState { // should never happen
log.Error("Node is not in temporary state")
return
}
c.stepDiv = stepDiv
} }
// enforceLimits enforces active node count and total capacity limits. It returns the // enforceLimits enforces active node count and total capacity limits. It returns the
@ -412,10 +442,8 @@ func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
} else { } else {
pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap pp.setTempCapacity(c, c.capacity) // revert activeCount/activeCap
} }
c.tempState = false pp.unsetTempState(c)
c.bias = 0
c.stepDiv = pp.capacityStepDiv
c.minTarget = pp.minCap
if c.connected { if c.connected {
if c.capacity != 0 { if c.capacity != 0 {
pp.activeQueue.Push(c) pp.activeQueue.Push(c)
@ -462,13 +490,13 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
for pp.inactiveQueue.Size() > 0 { for pp.inactiveQueue.Size() > 0 {
c := pp.inactiveQueue.PopItem().(*ppNodeInfo) c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
pp.setTempState(c) pp.setTempState(c)
pp.setTempBias(c, pp.activeBias)
pp.setTempCapacity(c, pp.minCap) pp.setTempCapacity(c, pp.minCap)
c.bias = pp.activeBias
pp.activeQueue.Push(c) pp.activeQueue.Push(c)
pp.enforceLimits() pp.enforceLimits()
if c.tempCapacity > 0 { if c.tempCapacity > 0 {
commit = true commit = true
c.bias = 0 pp.setTempBias(c, 0)
} else { } else {
break break
} }
@ -483,14 +511,9 @@ func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
func (pp *priorityPool) updatePriority(node *enode.Node) { func (pp *priorityPool) updatePriority(node *enode.Node) {
pp.lock.Lock() pp.lock.Lock()
pp.activeQueue.Refresh() pp.activeQueue.Refresh()
var updates []capUpdate
defer func() {
pp.lock.Unlock()
pp.updateFlags(updates)
}()
c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo) c, _ := pp.ns.GetField(node, pp.setup.queueField).(*ppNodeInfo)
if c == nil || !c.connected { if c == nil || !c.connected {
pp.lock.Unlock()
return return
} }
pp.activeQueue.Remove(c.activeIndex) pp.activeQueue.Remove(c.activeIndex)
@ -500,7 +523,9 @@ func (pp *priorityPool) updatePriority(node *enode.Node) {
} else { } else {
pp.inactiveQueue.Push(c, pp.inactivePriority(c)) pp.inactiveQueue.Push(c, pp.inactivePriority(c))
} }
updates = pp.tryActivate(false) updates := pp.tryActivate(false)
pp.lock.Unlock()
pp.updateFlags(updates)
} }
// capacityCurve is a snapshot of the priority pool contents in a format that can efficiently // capacityCurve is a snapshot of the priority pool contents in a format that can efficiently