les/vflux: fixed panic and data races (#23865)

* les/vflux/server: fix BalanceOperation

* les/vflux/client: fixed data races
This commit is contained in:
Felföldi Zsolt 2021-11-08 10:29:59 +01:00 committed by GitHub
parent 476fb565ce
commit 8be8ba450e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 12 deletions

View File

@ -19,6 +19,7 @@ package client
import ( import (
"math/rand" "math/rand"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -52,7 +53,7 @@ func testNodeIndex(id enode.ID) int {
type ServerPoolTest struct { type ServerPoolTest struct {
db ethdb.KeyValueStore db ethdb.KeyValueStore
clock *mclock.Simulated clock *mclock.Simulated
quit chan struct{} quit chan chan struct{}
preNeg, preNegFail bool preNeg, preNegFail bool
vt *ValueTracker vt *ValueTracker
sp *ServerPool sp *ServerPool
@ -62,6 +63,8 @@ type ServerPoolTest struct {
trusted []string trusted []string
waitCount, waitEnded int32 waitCount, waitEnded int32
lock sync.Mutex
cycle, conn, servedConn int cycle, conn, servedConn int
serviceCycles, dialCount int serviceCycles, dialCount int
disconnect map[int][]int disconnect map[int][]int
@ -112,7 +115,9 @@ func (s *ServerPoolTest) start() {
testQuery = func(node *enode.Node) int { testQuery = func(node *enode.Node) int {
idx := testNodeIndex(node.ID()) idx := testNodeIndex(node.ID())
n := &s.testNodes[idx] n := &s.testNodes[idx]
s.lock.Lock()
canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
s.lock.Unlock()
if s.preNegFail { if s.preNegFail {
// simulate a scenario where UDP queries never work // simulate a scenario where UDP queries never work
s.beginWait() s.beginWait()
@ -155,7 +160,7 @@ func (s *ServerPoolTest) start() {
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) } s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int) s.disconnect = make(map[int][]int)
s.sp.Start() s.sp.Start()
s.quit = make(chan struct{}) s.quit = make(chan chan struct{})
go func() { go func() {
last := int32(-1) last := int32(-1)
for { for {
@ -167,7 +172,8 @@ func (s *ServerPoolTest) start() {
s.clock.Run(time.Second) s.clock.Run(time.Second)
} }
last = c last = c
case <-s.quit: case quit := <-s.quit:
close(quit)
return return
} }
} }
@ -175,7 +181,9 @@ func (s *ServerPoolTest) start() {
} }
func (s *ServerPoolTest) stop() { func (s *ServerPoolTest) stop() {
close(s.quit) quit := make(chan struct{})
s.quit <- quit
<-quit
s.sp.Stop() s.sp.Stop()
s.spi.Close() s.spi.Close()
for i := range s.testNodes { for i := range s.testNodes {
@ -234,7 +242,9 @@ func (s *ServerPoolTest) run() {
} }
s.serviceCycles += s.servedConn s.serviceCycles += s.servedConn
s.clock.Run(time.Second) s.clock.Run(time.Second)
s.lock.Lock()
s.cycle++ s.cycle++
s.lock.Unlock()
} }
} }

View File

@ -50,7 +50,7 @@ type NodeValueTracker struct {
lastTransfer mclock.AbsTime lastTransfer mclock.AbsTime
basket serverBasket basket serverBasket
reqCosts []uint64 reqCosts []uint64
reqValues *[]float64 reqValues []float64
} }
// UpdateCosts updates the node value tracker's request cost table // UpdateCosts updates the node value tracker's request cost table
@ -58,14 +58,14 @@ func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64) {
nv.vt.lock.Lock() nv.vt.lock.Lock()
defer nv.vt.lock.Unlock() defer nv.vt.lock.Unlock()
nv.updateCosts(reqCosts, &nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts)) nv.updateCosts(reqCosts, nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
} }
// updateCosts updates the request cost table of the server. The request value factor // updateCosts updates the request cost table of the server. The request value factor
// is also updated based on the given cost table and the current reference basket. // is also updated based on the given cost table and the current reference basket.
// Note that the contents of the referenced reqValues slice will not change; a new // Note that the contents of the referenced reqValues slice will not change; a new
// reference is passed if the values are updated by ValueTracker. // reference is passed if the values are updated by ValueTracker.
func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) { func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues []float64, rvFactor float64) {
nv.lock.Lock() nv.lock.Lock()
defer nv.lock.Unlock() defer nv.lock.Unlock()
@ -112,7 +112,7 @@ func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration)
var value float64 var value float64
for _, r := range reqs { for _, r := range reqs {
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor) nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount) value += nv.reqValues[r.ReqType] * float64(r.Amount)
} }
nv.rtStats.Add(respTime, value, expFactor) nv.rtStats.Add(respTime, value, expFactor)
} }
@ -356,7 +356,7 @@ func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
reqTypeCount := len(vt.refBasket.reqValues) reqTypeCount := len(vt.refBasket.reqValues)
nv.reqCosts = make([]uint64, reqTypeCount) nv.reqCosts = make([]uint64, reqTypeCount)
nv.lastTransfer = vt.clock.Now() nv.lastTransfer = vt.clock.Now()
nv.reqValues = &vt.refBasket.reqValues nv.reqValues = vt.refBasket.reqValues
nv.basket.init(reqTypeCount) nv.basket.init(reqTypeCount)
vt.connected[id] = nv vt.connected[id] = nv
@ -476,7 +476,7 @@ func (vt *ValueTracker) periodicUpdate() {
vt.refBasket.normalize() vt.refBasket.normalize()
vt.refBasket.updateReqValues() vt.refBasket.updateReqValues()
for _, nv := range vt.connected { for _, nv := range vt.connected {
nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts)) nv.updateCosts(nv.reqCosts, vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
} }
vt.saveToDb() vt.saveToDb()
} }

View File

@ -223,8 +223,9 @@ func (bt *balanceTracker) BalanceOperation(id enode.ID, connAddress string, cb f
var nb *nodeBalance var nb *nodeBalance
if node := bt.ns.GetNode(id); node != nil { if node := bt.ns.GetNode(id); node != nil {
nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance) nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
} else { }
node = enode.SignNull(&enr.Record{}, id) if nb == nil {
node := enode.SignNull(&enr.Record{}, id)
nb = bt.newNodeBalance(node, connAddress, false) nb = bt.newNodeBalance(node, connAddress, false)
} }
cb(nb) cb(nb)