plugeth/les/vflux/client/serverpool_test.go
2023-11-22 14:00:30 +01:00

425 lines
10 KiB
Go

// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
const (
spTestNodes = 1000
spTestTarget = 5
spTestLength = 10000
spMinTotal = 40000
spMaxTotal = 50000
)
func testNodeID(i int) enode.ID {
return enode.ID{42, byte(i % 256), byte(i / 256)}
}
func testNodeIndex(id enode.ID) int {
if id[0] != 42 {
return -1
}
return int(id[1]) + int(id[2])*256
}
type ServerPoolTest struct {
db ethdb.KeyValueStore
clock *mclock.Simulated
quit chan chan struct{}
preNeg, preNegFail bool
sp *ServerPool
spi enode.Iterator
input enode.Iterator
testNodes []spTestNode
trusted []string
waitCount, waitEnded int32
// preNegLock protects the cycle counter, testNodes list and its connected field
// (accessed from both the main thread and the preNeg callback)
preNegLock sync.Mutex
queryWg *sync.WaitGroup // a new wait group is created each time the simulation is started
stopping bool // stopping avoid calling queryWg.Add after queryWg.Wait
cycle, conn, servedConn int
serviceCycles, dialCount int
disconnect map[int][]int
}
type spTestNode struct {
connectCycles, waitCycles int
nextConnCycle, totalConn int
connected, service bool
node *enode.Node
}
func newServerPoolTest(preNeg, preNegFail bool) *ServerPoolTest {
nodes := make([]*enode.Node, spTestNodes)
for i := range nodes {
nodes[i] = enode.SignNull(&enr.Record{}, testNodeID(i))
}
return &ServerPoolTest{
clock: &mclock.Simulated{},
db: memorydb.New(),
input: enode.CycleNodes(nodes),
testNodes: make([]spTestNode, spTestNodes),
preNeg: preNeg,
preNegFail: preNegFail,
}
}
func (s *ServerPoolTest) beginWait() {
// ensure that dialIterator and the maximal number of pre-neg queries are not all stuck in a waiting state
for atomic.AddInt32(&s.waitCount, 1) > preNegLimit {
atomic.AddInt32(&s.waitCount, -1)
s.clock.Run(time.Second)
}
}
func (s *ServerPoolTest) endWait() {
atomic.AddInt32(&s.waitCount, -1)
atomic.AddInt32(&s.waitEnded, 1)
}
func (s *ServerPoolTest) addTrusted(i int) {
s.trusted = append(s.trusted, enode.SignNull(&enr.Record{}, testNodeID(i)).String())
}
func (s *ServerPoolTest) start() {
var testQuery QueryFunc
s.queryWg = new(sync.WaitGroup)
if s.preNeg {
testQuery = func(node *enode.Node) int {
s.preNegLock.Lock()
if s.stopping {
s.preNegLock.Unlock()
return 0
}
s.queryWg.Add(1)
idx := testNodeIndex(node.ID())
n := &s.testNodes[idx]
canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
s.preNegLock.Unlock()
defer s.queryWg.Done()
if s.preNegFail {
// simulate a scenario where UDP queries never work
s.beginWait()
s.clock.Sleep(time.Second * 5)
s.endWait()
return -1
}
switch idx % 3 {
case 0:
// pre-neg returns true only if connection is possible
if canConnect {
return 1
}
return 0
case 1:
// pre-neg returns true but connection might still fail
return 1
case 2:
// pre-neg returns true if connection is possible, otherwise timeout (node unresponsive)
if canConnect {
return 1
}
s.beginWait()
s.clock.Sleep(time.Second * 5)
s.endWait()
return -1
}
return -1
}
}
requestList := make([]RequestInfo, testReqTypes)
for i := range requestList {
requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
}
s.sp, s.spi = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
s.sp.AddSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
s.disconnect = make(map[int][]int)
s.sp.Start()
s.quit = make(chan chan struct{})
go func() {
last := int32(-1)
for {
select {
case <-time.After(time.Millisecond * 100):
c := atomic.LoadInt32(&s.waitEnded)
if c == last {
// advance clock if test is stuck (might happen in rare cases)
s.clock.Run(time.Second)
}
last = c
case quit := <-s.quit:
close(quit)
return
}
}
}()
}
func (s *ServerPoolTest) stop() {
// disable further queries and wait if one is currently running
s.preNegLock.Lock()
s.stopping = true
s.preNegLock.Unlock()
s.queryWg.Wait()
quit := make(chan struct{})
s.quit <- quit
<-quit
s.sp.Stop()
s.spi.Close()
s.preNegLock.Lock()
s.stopping = false
s.preNegLock.Unlock()
for i := range s.testNodes {
n := &s.testNodes[i]
if n.connected {
n.totalConn += s.cycle
}
n.connected = false
n.node = nil
n.nextConnCycle = 0
}
s.conn, s.servedConn = 0, 0
}
func (s *ServerPoolTest) run() {
for count := spTestLength; count > 0; count-- {
if dcList := s.disconnect[s.cycle]; dcList != nil {
for _, idx := range dcList {
n := &s.testNodes[idx]
s.sp.UnregisterNode(n.node)
n.totalConn += s.cycle
s.preNegLock.Lock()
n.connected = false
s.preNegLock.Unlock()
n.node = nil
s.conn--
if n.service {
s.servedConn--
}
n.nextConnCycle = s.cycle + n.waitCycles
}
delete(s.disconnect, s.cycle)
}
if s.conn < spTestTarget {
s.dialCount++
s.beginWait()
s.spi.Next()
s.endWait()
dial := s.spi.Node()
id := dial.ID()
idx := testNodeIndex(id)
n := &s.testNodes[idx]
if !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle {
s.conn++
if n.service {
s.servedConn++
}
n.totalConn -= s.cycle
s.preNegLock.Lock()
n.connected = true
s.preNegLock.Unlock()
dc := s.cycle + n.connectCycles
s.disconnect[dc] = append(s.disconnect[dc], idx)
n.node = dial
nv, _ := s.sp.RegisterNode(n.node)
if n.service {
nv.Served([]ServedRequest{{ReqType: 0, Amount: 100}}, 0)
}
}
}
s.serviceCycles += s.servedConn
s.clock.Run(time.Second)
s.preNegLock.Lock()
s.cycle++
s.preNegLock.Unlock()
}
}
func (s *ServerPoolTest) setNodes(count, conn, wait int, service, trusted bool) (res []int) {
for ; count > 0; count-- {
idx := rand.Intn(spTestNodes)
for s.testNodes[idx].connectCycles != 0 || s.testNodes[idx].connected {
idx = rand.Intn(spTestNodes)
}
res = append(res, idx)
s.preNegLock.Lock()
s.testNodes[idx] = spTestNode{
connectCycles: conn,
waitCycles: wait,
service: service,
}
s.preNegLock.Unlock()
if trusted {
s.addTrusted(idx)
}
}
return
}
func (s *ServerPoolTest) resetNodes() {
for i, n := range s.testNodes {
if n.connected {
n.totalConn += s.cycle
s.sp.UnregisterNode(n.node)
}
s.preNegLock.Lock()
s.testNodes[i] = spTestNode{totalConn: n.totalConn}
s.preNegLock.Unlock()
}
s.conn, s.servedConn = 0, 0
s.disconnect = make(map[int][]int)
s.trusted = nil
}
func (s *ServerPoolTest) checkNodes(t *testing.T, nodes []int) {
var sum int
for _, idx := range nodes {
n := &s.testNodes[idx]
if n.connected {
n.totalConn += s.cycle
}
sum += n.totalConn
n.totalConn = 0
if n.connected {
n.totalConn -= s.cycle
}
}
if sum < spMinTotal || sum > spMaxTotal {
t.Errorf("Total connection amount %d outside expected range %d to %d", sum, spMinTotal, spMaxTotal)
}
}
func TestServerPool(t *testing.T) {
t.Parallel()
testServerPool(t, false, false)
}
func TestServerPoolWithPreNeg(t *testing.T) {
t.Parallel()
testServerPool(t, true, false)
}
func TestServerPoolWithPreNegFail(t *testing.T) {
t.Parallel()
testServerPool(t, true, true)
}
func testServerPool(t *testing.T, preNeg, fail bool) {
s := newServerPoolTest(preNeg, fail)
nodes := s.setNodes(100, 200, 200, true, false)
s.setNodes(100, 20, 20, false, false)
s.start()
s.run()
s.stop()
s.checkNodes(t, nodes)
}
func TestServerPoolChangedNodes(t *testing.T) {
t.Parallel()
testServerPoolChangedNodes(t, false)
}
func TestServerPoolChangedNodesWithPreNeg(t *testing.T) {
t.Parallel()
testServerPoolChangedNodes(t, true)
}
func testServerPoolChangedNodes(t *testing.T, preNeg bool) {
s := newServerPoolTest(preNeg, false)
nodes := s.setNodes(100, 200, 200, true, false)
s.setNodes(100, 20, 20, false, false)
s.start()
s.run()
s.checkNodes(t, nodes)
for i := 0; i < 3; i++ {
s.resetNodes()
nodes := s.setNodes(100, 200, 200, true, false)
s.setNodes(100, 20, 20, false, false)
s.run()
s.checkNodes(t, nodes)
}
s.stop()
}
func TestServerPoolRestartNoDiscovery(t *testing.T) {
t.Parallel()
testServerPoolRestartNoDiscovery(t, false)
}
func TestServerPoolRestartNoDiscoveryWithPreNeg(t *testing.T) {
t.Parallel()
testServerPoolRestartNoDiscovery(t, true)
}
func testServerPoolRestartNoDiscovery(t *testing.T, preNeg bool) {
s := newServerPoolTest(preNeg, false)
nodes := s.setNodes(100, 200, 200, true, false)
s.setNodes(100, 20, 20, false, false)
s.start()
s.run()
s.stop()
s.checkNodes(t, nodes)
s.input = nil
s.start()
s.run()
s.stop()
s.checkNodes(t, nodes)
}
func TestServerPoolTrustedNoDiscovery(t *testing.T) {
t.Parallel()
testServerPoolTrustedNoDiscovery(t, false)
}
func TestServerPoolTrustedNoDiscoveryWithPreNeg(t *testing.T) {
t.Parallel()
testServerPoolTrustedNoDiscovery(t, true)
}
func testServerPoolTrustedNoDiscovery(t *testing.T, preNeg bool) {
s := newServerPoolTest(preNeg, false)
trusted := s.setNodes(200, 200, 200, true, true)
s.input = nil
s.start()
s.run()
s.stop()
s.checkNodes(t, trusted)
}