p2p/discover: add Table configuration and Nodes method (#27387)

* p2p/discover: remove ReadRandomNodes

Even though it's public, this method is not callable by code outside of
package p2p/discover because one can't get a valid instance of Table.

* p2p/discover: add Table.Nodes

* p2p/discover: make Table settings configurable

In unit tests and externally developed cmd/devp2p test runs, it can be
useful to tune the timer intervals used by Table.
This commit is contained in:
Felix Lange 2023-05-31 13:37:10 +02:00 committed by GitHub
parent 008086f935
commit ac86547b01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 85 deletions

View File

@ -19,6 +19,7 @@ package discover
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"net" "net"
"time"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -35,29 +36,39 @@ type UDPConn interface {
LocalAddr() net.Addr LocalAddr() net.Addr
} }
type V5Config struct {
ProtocolID *[6]byte
}
// Config holds settings for the discovery listener. // Config holds settings for the discovery listener.
type Config struct { type Config struct {
// These settings are required and configure the UDP listener: // These settings are required and configure the UDP listener:
PrivateKey *ecdsa.PrivateKey PrivateKey *ecdsa.PrivateKey
// These settings are optional: // All remaining settings are optional.
// Packet handling configuration:
NetRestrict *netutil.Netlist // list of allowed IP networks NetRestrict *netutil.Netlist // list of allowed IP networks
Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
Log log.Logger // if set, log messages go here
// V5ProtocolID configures the discv5 protocol identifier. // Node table configuration:
Bootnodes []*enode.Node // list of bootstrap nodes
PingInterval time.Duration // speed of node liveness check
RefreshInterval time.Duration // used in bucket refresh
// The options below are useful in very specific cases, like in unit tests.
V5ProtocolID *[6]byte V5ProtocolID *[6]byte
Log log.Logger // if set, log messages go here
ValidSchemes enr.IdentityScheme // allowed identity schemes ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock Clock mclock.Clock
} }
func (cfg Config) withDefaults() Config { func (cfg Config) withDefaults() Config {
// Node table configuration:
if cfg.PingInterval == 0 {
cfg.PingInterval = 10 * time.Second
}
if cfg.RefreshInterval == 0 {
cfg.RefreshInterval = 30 * time.Minute
}
// Debug/test settings:
if cfg.Log == nil { if cfg.Log == nil {
cfg.Log = log.Root() cfg.Log = log.Root()
} }

View File

@ -53,12 +53,10 @@ const (
bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
tableIPLimit, tableSubnet = 10, 24 tableIPLimit, tableSubnet = 10, 24
refreshInterval = 30 * time.Minute copyNodesInterval = 30 * time.Second
revalidateInterval = 10 * time.Second seedMinTableTime = 5 * time.Minute
copyNodesInterval = 30 * time.Second seedCount = 30
seedMinTableTime = 5 * time.Minute seedMaxAge = 5 * 24 * time.Hour
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
) )
// Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps
@ -71,9 +69,12 @@ type Table struct {
rand *mrand.Rand // source of randomness, periodically reseeded rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet ips netutil.DistinctNetSet
log log.Logger db *enode.DB // database of known nodes
db *enode.DB // database of known nodes net transport
net transport cfg Config
log log.Logger
// loop channels
refreshReq chan chan struct{} refreshReq chan chan struct{}
initDone chan struct{} initDone chan struct{}
closeReq chan struct{} closeReq chan struct{}
@ -99,19 +100,21 @@ type bucket struct {
ips netutil.DistinctNetSet ips netutil.DistinctNetSet
} }
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) { func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
cfg = cfg.withDefaults()
tab := &Table{ tab := &Table{
net: t, net: t,
db: db, db: db,
cfg: cfg,
log: cfg.Log,
refreshReq: make(chan chan struct{}), refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}), initDone: make(chan struct{}),
closeReq: make(chan struct{}), closeReq: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)), rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
} }
if err := tab.setFallbackNodes(bootnodes); err != nil { if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil {
return nil, err return nil, err
} }
for i := range tab.buckets { for i := range tab.buckets {
@ -125,6 +128,24 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
return tab, nil return tab, nil
} }
// Nodes returns all nodes contained in the table.
func (tab *Table) Nodes() []*enode.Node {
if !tab.isInitDone() {
return nil
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
var nodes []*enode.Node
for _, b := range &tab.buckets {
for _, n := range b.entries {
nodes = append(nodes, unwrapNode(n))
}
}
return nodes
}
func (tab *Table) self() *enode.Node { func (tab *Table) self() *enode.Node {
return tab.net.Self() return tab.net.Self()
} }
@ -138,29 +159,6 @@ func (tab *Table) seedRand() {
tab.mutex.Unlock() tab.mutex.Unlock()
} }
// ReadRandomNodes fills the given slice with random nodes from the table. The results
// are guaranteed to be unique for a single invocation, no node will appear twice.
func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
if !tab.isInitDone() {
return 0
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
var nodes []*enode.Node
for _, b := range &tab.buckets {
for _, n := range b.entries {
nodes = append(nodes, unwrapNode(n))
}
}
// Shuffle.
for i := 0; i < len(nodes); i++ {
j := tab.rand.Intn(len(nodes))
nodes[i], nodes[j] = nodes[j], nodes[i]
}
return copy(buf, nodes)
}
// getNode returns the node with the given ID or nil if it isn't in the table. // getNode returns the node with the given ID or nil if it isn't in the table.
func (tab *Table) getNode(id enode.ID) *enode.Node { func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock() tab.mutex.Lock()
@ -218,7 +216,7 @@ func (tab *Table) refresh() <-chan struct{} {
func (tab *Table) loop() { func (tab *Table) loop() {
var ( var (
revalidate = time.NewTimer(tab.nextRevalidateTime()) revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval) refresh = time.NewTimer(tab.nextRefreshTime())
copyNodes = time.NewTicker(copyNodesInterval) copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion revalidateDone chan struct{} // where doRevalidate reports completion
@ -251,6 +249,7 @@ loop:
close(ch) close(ch)
} }
waiting, refreshDone = nil, nil waiting, refreshDone = nil, nil
refresh.Reset(tab.nextRefreshTime())
case <-revalidate.C: case <-revalidate.C:
revalidateDone = make(chan struct{}) revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone) go tab.doRevalidate(revalidateDone)
@ -373,7 +372,15 @@ func (tab *Table) nextRevalidateTime() time.Duration {
tab.mutex.Lock() tab.mutex.Lock()
defer tab.mutex.Unlock() defer tab.mutex.Unlock()
return time.Duration(tab.rand.Int63n(int64(revalidateInterval))) return time.Duration(tab.rand.Int63n(int64(tab.cfg.PingInterval)))
}
func (tab *Table) nextRefreshTime() time.Duration {
tab.mutex.Lock()
defer tab.mutex.Unlock()
half := tab.cfg.RefreshInterval / 2
return half + time.Duration(tab.rand.Int63n(int64(half)))
} }
// copyLiveNodes adds nodes from the table to the database if they have been in the table // copyLiveNodes adds nodes from the table to the database if they have been in the table
@ -481,10 +488,12 @@ func (tab *Table) addSeenNode(n *node) {
// Can't add: IP limit reached. // Can't add: IP limit reached.
return return
} }
// Add to end of bucket: // Add to end of bucket:
b.entries = append(b.entries, n) b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n) b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now() n.addedAt = time.Now()
if tab.nodeAddedHook != nil { if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n) tab.nodeAddedHook(n)
} }
@ -523,10 +532,12 @@ func (tab *Table) addVerifiedNode(n *node) {
// Can't add: IP limit reached. // Can't add: IP limit reached.
return return
} }
// Add to front of bucket. // Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize) b.entries, _ = pushNode(b.entries, n, bucketSize)
b.replacements = deleteNode(b.replacements, n) b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now() n.addedAt = time.Now()
if tab.nodeAddedHook != nil { if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n) tab.nodeAddedHook(n)
} }

View File

@ -247,41 +247,6 @@ func TestTable_findnodeByID(t *testing.T) {
} }
} }
func TestTable_ReadRandomNodesGetAll(t *testing.T) {
cfg := &quick.Config{
MaxCount: 200,
Rand: rand.New(rand.NewSource(time.Now().Unix())),
Values: func(args []reflect.Value, rand *rand.Rand) {
args[0] = reflect.ValueOf(make([]*enode.Node, rand.Intn(1000)))
},
}
test := func(buf []*enode.Node) bool {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer db.Close()
defer tab.close()
<-tab.initDone
for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
t.Errorf("wrong number of nodes, got %d, want %d", gotN, tab.len())
return false
}
if hasDuplicates(wrapNodes(buf[:gotN])) {
t.Errorf("result contains duplicates")
return false
}
return true
}
if err := quick.Check(test, cfg); err != nil {
t.Error(err)
}
}
type closeTest struct { type closeTest struct {
Self enode.ID Self enode.ID
Target enode.ID Target enode.ID

View File

@ -28,7 +28,6 @@ import (
"sync" "sync"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/enr"
) )
@ -42,8 +41,9 @@ func init() {
} }
func newTestTable(t transport) (*Table, *enode.DB) { func newTestTable(t transport) (*Table, *enode.DB) {
cfg := Config{}
db, _ := enode.OpenDB("") db, _ := enode.OpenDB("")
tab, _ := newTable(t, db, nil, log.Root()) tab, _ := newTable(t, db, cfg)
go tab.loop() go tab.loop()
return tab, db return tab, db
} }

View File

@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log, log: cfg.Log,
} }
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) tab, err := newTable(t, ln.Database(), cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
cancelCloseCtx: cancelCloseCtx, cancelCloseCtx: cancelCloseCtx,
} }
t.talk = newTalkSystem(t) t.talk = newTalkSystem(t)
tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log) tab, err := newTable(t, t.db, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }