p2p/discover, p2p/enode: rework endpoint proof handling, packet logging (#18963)
This change resolves multiple issues around handling of endpoint proofs. The proof is now done separately for each IP and completing the proof requires a matching ping hash. Also remove waitping because it's equivalent to sleep. waitping was slightly more efficient, but that may cause issues with findnode if packets are reordered and the remote end sees findnode before pong. Logging of received packets was hitherto done after handling the packet, which meant that sent replies were logged before the packet that generated them. This change splits up packet handling into 'preverify' and 'handle'. The error from 'preverify' is logged, but 'handle' happens after the message is logged. This fixes the order. Packet logs now contain the node ID.
This commit is contained in:
parent
74c38902ec
commit
f0c6f92140
@ -34,6 +34,7 @@ import (
|
|||||||
type node struct {
|
type node struct {
|
||||||
enode.Node
|
enode.Node
|
||||||
addedAt time.Time // time when the node was added to the table
|
addedAt time.Time // time when the node was added to the table
|
||||||
|
livenessChecks uint // how often liveness was checked
|
||||||
}
|
}
|
||||||
|
|
||||||
type encPubkey [64]byte
|
type encPubkey [64]byte
|
||||||
|
@ -75,6 +75,8 @@ type Table struct {
|
|||||||
net transport
|
net transport
|
||||||
refreshReq chan chan struct{}
|
refreshReq chan chan struct{}
|
||||||
initDone chan struct{}
|
initDone chan struct{}
|
||||||
|
|
||||||
|
closeOnce sync.Once
|
||||||
closeReq chan struct{}
|
closeReq chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
|
||||||
@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
|
|||||||
|
|
||||||
// Close terminates the network listener and flushes the node database.
|
// Close terminates the network listener and flushes the node database.
|
||||||
func (tab *Table) Close() {
|
func (tab *Table) Close() {
|
||||||
|
tab.closeOnce.Do(func() {
|
||||||
if tab.net != nil {
|
if tab.net != nil {
|
||||||
tab.net.close()
|
tab.net.close()
|
||||||
}
|
}
|
||||||
|
// Wait for loop to end.
|
||||||
select {
|
close(tab.closeReq)
|
||||||
case <-tab.closed:
|
<-tab.closed
|
||||||
// already closed.
|
})
|
||||||
case tab.closeReq <- struct{}{}:
|
|
||||||
<-tab.closed // wait for refreshLoop to end.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setFallbackNodes sets the initial points of contact. These nodes
|
// setFallbackNodes sets the initial points of contact. These nodes
|
||||||
@ -290,31 +290,39 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
|
|||||||
// we have asked all closest nodes, stop the search
|
// we have asked all closest nodes, stop the search
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// wait for the next reply
|
select {
|
||||||
for _, n := range <-reply {
|
case nodes := <-reply:
|
||||||
|
for _, n := range nodes {
|
||||||
if n != nil && !seen[n.ID()] {
|
if n != nil && !seen[n.ID()] {
|
||||||
seen[n.ID()] = true
|
seen[n.ID()] = true
|
||||||
result.push(n, bucketSize)
|
result.push(n, bucketSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case <-tab.closeReq:
|
||||||
|
return nil // shutdown, no need to continue.
|
||||||
|
}
|
||||||
pendingQueries--
|
pendingQueries--
|
||||||
}
|
}
|
||||||
return result.entries
|
return result.entries
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
|
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
|
||||||
fails := tab.db.FindFails(n.ID())
|
fails := tab.db.FindFails(n.ID(), n.IP())
|
||||||
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
|
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
|
||||||
if err != nil || len(r) == 0 {
|
if err == errClosed {
|
||||||
|
// Avoid recording failures on shutdown.
|
||||||
|
reply <- nil
|
||||||
|
return
|
||||||
|
} else if err != nil || len(r) == 0 {
|
||||||
fails++
|
fails++
|
||||||
tab.db.UpdateFindFails(n.ID(), fails)
|
tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
|
||||||
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
|
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
|
||||||
if fails >= maxFindnodeFailures {
|
if fails >= maxFindnodeFailures {
|
||||||
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
|
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
|
||||||
tab.delete(n)
|
tab.delete(n)
|
||||||
}
|
}
|
||||||
} else if fails > 0 {
|
} else if fails > 0 {
|
||||||
tab.db.UpdateFindFails(n.ID(), fails-1)
|
tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
|
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
|
||||||
@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} {
|
|||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
select {
|
select {
|
||||||
case tab.refreshReq <- done:
|
case tab.refreshReq <- done:
|
||||||
case <-tab.closed:
|
case <-tab.closeReq:
|
||||||
close(done)
|
close(done)
|
||||||
}
|
}
|
||||||
return done
|
return done
|
||||||
@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() {
|
|||||||
seeds = append(seeds, tab.nursery...)
|
seeds = append(seeds, tab.nursery...)
|
||||||
for i := range seeds {
|
for i := range seeds {
|
||||||
seed := seeds[i]
|
seed := seeds[i]
|
||||||
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
|
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
|
||||||
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
|
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
|
||||||
tab.add(seed)
|
tab.add(seed)
|
||||||
}
|
}
|
||||||
@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
|
|||||||
b := tab.buckets[bi]
|
b := tab.buckets[bi]
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// The node responded, move it to the front.
|
// The node responded, move it to the front.
|
||||||
log.Debug("Revalidated node", "b", bi, "id", last.ID())
|
last.livenessChecks++
|
||||||
|
log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
|
||||||
b.bump(last)
|
b.bump(last)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// No reply received, pick a replacement or delete the node if there aren't
|
// No reply received, pick a replacement or delete the node if there aren't
|
||||||
// any replacements.
|
// any replacements.
|
||||||
if r := tab.replace(b, last); r != nil {
|
if r := tab.replace(b, last); r != nil {
|
||||||
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP())
|
log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP())
|
log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() {
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
for _, b := range &tab.buckets {
|
for _, b := range &tab.buckets {
|
||||||
for _, n := range b.entries {
|
for _, n := range b.entries {
|
||||||
if now.Sub(n.addedAt) >= seedMinTableTime {
|
if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
|
||||||
tab.db.UpdateNode(unwrapNode(n))
|
tab.db.UpdateNode(unwrapNode(n))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -518,9 +527,11 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance {
|
|||||||
close := &nodesByDistance{target: target}
|
close := &nodesByDistance{target: target}
|
||||||
for _, b := range &tab.buckets {
|
for _, b := range &tab.buckets {
|
||||||
for _, n := range b.entries {
|
for _, n := range b.entries {
|
||||||
|
if n.livenessChecks > 0 {
|
||||||
close.push(n, nresults)
|
close.push(n, nresults)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return close
|
return close
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,23 +583,6 @@ func (tab *Table) addThroughPing(n *node) {
|
|||||||
tab.add(n)
|
tab.add(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// stuff adds nodes the table to the end of their corresponding bucket
|
|
||||||
// if the bucket is not full. The caller must not hold tab.mutex.
|
|
||||||
func (tab *Table) stuff(nodes []*node) {
|
|
||||||
tab.mutex.Lock()
|
|
||||||
defer tab.mutex.Unlock()
|
|
||||||
|
|
||||||
for _, n := range nodes {
|
|
||||||
if n.ID() == tab.self().ID() {
|
|
||||||
continue // don't add self
|
|
||||||
}
|
|
||||||
b := tab.bucket(n.ID())
|
|
||||||
if len(b.entries) < bucketSize {
|
|
||||||
tab.bumpOrAdd(b, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete removes an entry from the node table. It is used to evacuate dead nodes.
|
// delete removes an entry from the node table. It is used to evacuate dead nodes.
|
||||||
func (tab *Table) delete(node *node) {
|
func (tab *Table) delete(node *node) {
|
||||||
tab.mutex.Lock()
|
tab.mutex.Lock()
|
||||||
|
@ -50,8 +50,8 @@ func TestTable_pingReplace(t *testing.T) {
|
|||||||
func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) {
|
func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) {
|
||||||
transport := newPingRecorder()
|
transport := newPingRecorder()
|
||||||
tab, db := newTestTable(transport)
|
tab, db := newTestTable(transport)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
defer tab.Close()
|
||||||
|
|
||||||
<-tab.initDone
|
<-tab.initDone
|
||||||
|
|
||||||
@ -137,8 +137,8 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
|
|||||||
func TestTable_IPLimit(t *testing.T) {
|
func TestTable_IPLimit(t *testing.T) {
|
||||||
transport := newPingRecorder()
|
transport := newPingRecorder()
|
||||||
tab, db := newTestTable(transport)
|
tab, db := newTestTable(transport)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
defer tab.Close()
|
||||||
|
|
||||||
for i := 0; i < tableIPLimit+1; i++ {
|
for i := 0; i < tableIPLimit+1; i++ {
|
||||||
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
|
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
|
||||||
@ -153,8 +153,8 @@ func TestTable_IPLimit(t *testing.T) {
|
|||||||
func TestTable_BucketIPLimit(t *testing.T) {
|
func TestTable_BucketIPLimit(t *testing.T) {
|
||||||
transport := newPingRecorder()
|
transport := newPingRecorder()
|
||||||
tab, db := newTestTable(transport)
|
tab, db := newTestTable(transport)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
defer tab.Close()
|
||||||
|
|
||||||
d := 3
|
d := 3
|
||||||
for i := 0; i < bucketIPLimit+1; i++ {
|
for i := 0; i < bucketIPLimit+1; i++ {
|
||||||
@ -173,9 +173,9 @@ func TestTable_closest(t *testing.T) {
|
|||||||
// for any node table, Target and N
|
// for any node table, Target and N
|
||||||
transport := newPingRecorder()
|
transport := newPingRecorder()
|
||||||
tab, db := newTestTable(transport)
|
tab, db := newTestTable(transport)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
tab.stuff(test.All)
|
defer tab.Close()
|
||||||
|
fillTable(tab, test.All)
|
||||||
|
|
||||||
// check that closest(Target, N) returns nodes
|
// check that closest(Target, N) returns nodes
|
||||||
result := tab.closest(test.Target, test.N).entries
|
result := tab.closest(test.Target, test.N).entries
|
||||||
@ -234,13 +234,13 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
|
|||||||
test := func(buf []*enode.Node) bool {
|
test := func(buf []*enode.Node) bool {
|
||||||
transport := newPingRecorder()
|
transport := newPingRecorder()
|
||||||
tab, db := newTestTable(transport)
|
tab, db := newTestTable(transport)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
defer tab.Close()
|
||||||
<-tab.initDone
|
<-tab.initDone
|
||||||
|
|
||||||
for i := 0; i < len(buf); i++ {
|
for i := 0; i < len(buf); i++ {
|
||||||
ld := cfg.Rand.Intn(len(tab.buckets))
|
ld := cfg.Rand.Intn(len(tab.buckets))
|
||||||
tab.stuff([]*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
|
fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
|
||||||
}
|
}
|
||||||
gotN := tab.ReadRandomNodes(buf)
|
gotN := tab.ReadRandomNodes(buf)
|
||||||
if gotN != tab.len() {
|
if gotN != tab.len() {
|
||||||
@ -272,16 +272,19 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
|
|||||||
N: rand.Intn(bucketSize),
|
N: rand.Intn(bucketSize),
|
||||||
}
|
}
|
||||||
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
|
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
|
||||||
n := enode.SignNull(new(enr.Record), id)
|
r := new(enr.Record)
|
||||||
t.All = append(t.All, wrapNode(n))
|
r.Set(enr.IP(genIP(rand)))
|
||||||
|
n := wrapNode(enode.SignNull(r, id))
|
||||||
|
n.livenessChecks = 1
|
||||||
|
t.All = append(t.All, n)
|
||||||
}
|
}
|
||||||
return reflect.ValueOf(t)
|
return reflect.ValueOf(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTable_Lookup(t *testing.T) {
|
func TestTable_Lookup(t *testing.T) {
|
||||||
tab, db := newTestTable(lookupTestnet)
|
tab, db := newTestTable(lookupTestnet)
|
||||||
defer tab.Close()
|
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
defer tab.Close()
|
||||||
|
|
||||||
// lookup on empty table returns no nodes
|
// lookup on empty table returns no nodes
|
||||||
if results := tab.lookup(lookupTestnet.target, false); len(results) > 0 {
|
if results := tab.lookup(lookupTestnet.target, false); len(results) > 0 {
|
||||||
@ -289,8 +292,9 @@ func TestTable_Lookup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// seed table with initial node (otherwise lookup will terminate immediately)
|
// seed table with initial node (otherwise lookup will terminate immediately)
|
||||||
seedKey, _ := decodePubkey(lookupTestnet.dists[256][0])
|
seedKey, _ := decodePubkey(lookupTestnet.dists[256][0])
|
||||||
seed := wrapNode(enode.NewV4(seedKey, net.IP{}, 0, 256))
|
seed := wrapNode(enode.NewV4(seedKey, net.IP{127, 0, 0, 1}, 0, 256))
|
||||||
tab.stuff([]*node{seed})
|
seed.livenessChecks = 1
|
||||||
|
fillTable(tab, []*node{seed})
|
||||||
|
|
||||||
results := tab.lookup(lookupTestnet.target, true)
|
results := tab.lookup(lookupTestnet.target, true)
|
||||||
t.Logf("results:")
|
t.Logf("results:")
|
||||||
@ -578,6 +582,12 @@ func gen(typ interface{}, rand *rand.Rand) interface{} {
|
|||||||
return v.Interface()
|
return v.Interface()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genIP(rand *rand.Rand) net.IP {
|
||||||
|
ip := make(net.IP, 4)
|
||||||
|
rand.Read(ip)
|
||||||
|
return ip
|
||||||
|
}
|
||||||
|
|
||||||
func quickcfg() *quick.Config {
|
func quickcfg() *quick.Config {
|
||||||
return &quick.Config{
|
return &quick.Config{
|
||||||
MaxCount: 5000,
|
MaxCount: 5000,
|
||||||
|
@ -83,6 +83,23 @@ func fillBucket(tab *Table, n *node) (last *node) {
|
|||||||
return b.entries[bucketSize-1]
|
return b.entries[bucketSize-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fillTable adds nodes the table to the end of their corresponding bucket
|
||||||
|
// if the bucket is not full. The caller must not hold tab.mutex.
|
||||||
|
func fillTable(tab *Table, nodes []*node) {
|
||||||
|
tab.mutex.Lock()
|
||||||
|
defer tab.mutex.Unlock()
|
||||||
|
|
||||||
|
for _, n := range nodes {
|
||||||
|
if n.ID() == tab.self().ID() {
|
||||||
|
continue // don't add self
|
||||||
|
}
|
||||||
|
b := tab.bucket(n.ID())
|
||||||
|
if len(b.entries) < bucketSize {
|
||||||
|
tab.bumpOrAdd(b, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type pingRecorder struct {
|
type pingRecorder struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
dead, pinged map[enode.ID]bool
|
dead, pinged map[enode.ID]bool
|
||||||
@ -109,10 +126,6 @@ func (t *pingRecorder) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPu
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *pingRecorder) waitping(from enode.ID) error {
|
|
||||||
return nil // remote always pings
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *pingRecorder) ping(toid enode.ID, toaddr *net.UDPAddr) error {
|
func (t *pingRecorder) ping(toid enode.ID, toaddr *net.UDPAddr) error {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
|
@ -67,6 +67,8 @@ const (
|
|||||||
// RPC request structures
|
// RPC request structures
|
||||||
type (
|
type (
|
||||||
ping struct {
|
ping struct {
|
||||||
|
senderKey *ecdsa.PublicKey // filled in by preverify
|
||||||
|
|
||||||
Version uint
|
Version uint
|
||||||
From, To rpcEndpoint
|
From, To rpcEndpoint
|
||||||
Expiration uint64
|
Expiration uint64
|
||||||
@ -155,8 +157,13 @@ func nodeToRPC(n *node) rpcNode {
|
|||||||
return rpcNode{ID: ekey, IP: n.IP(), UDP: uint16(n.UDP()), TCP: uint16(n.TCP())}
|
return rpcNode{ID: ekey, IP: n.IP(), UDP: uint16(n.UDP()), TCP: uint16(n.TCP())}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// packet is implemented by all protocol messages.
|
||||||
type packet interface {
|
type packet interface {
|
||||||
handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error
|
// preverify checks whether the packet is valid and should be handled at all.
|
||||||
|
preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error
|
||||||
|
// handle handles the packet.
|
||||||
|
handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte)
|
||||||
|
// name returns the name of the packet for logging purposes.
|
||||||
name() string
|
name() string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,43 +184,48 @@ type udp struct {
|
|||||||
tab *Table
|
tab *Table
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
addpending chan *pending
|
addReplyMatcher chan *replyMatcher
|
||||||
gotreply chan reply
|
gotreply chan reply
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pending represents a pending reply.
|
// pending represents a pending reply.
|
||||||
//
|
//
|
||||||
// some implementations of the protocol wish to send more than one
|
// Some implementations of the protocol wish to send more than one
|
||||||
// reply packet to findnode. in general, any neighbors packet cannot
|
// reply packet to findnode. In general, any neighbors packet cannot
|
||||||
// be matched up with a specific findnode packet.
|
// be matched up with a specific findnode packet.
|
||||||
//
|
//
|
||||||
// our implementation handles this by storing a callback function for
|
// Our implementation handles this by storing a callback function for
|
||||||
// each pending reply. incoming packets from a node are dispatched
|
// each pending reply. Incoming packets from a node are dispatched
|
||||||
// to all the callback functions for that node.
|
// to all callback functions for that node.
|
||||||
type pending struct {
|
type replyMatcher struct {
|
||||||
// these fields must match in the reply.
|
// these fields must match in the reply.
|
||||||
from enode.ID
|
from enode.ID
|
||||||
|
ip net.IP
|
||||||
ptype byte
|
ptype byte
|
||||||
|
|
||||||
// time when the request must complete
|
// time when the request must complete
|
||||||
deadline time.Time
|
deadline time.Time
|
||||||
|
|
||||||
// callback is called when a matching reply arrives. if it returns
|
// callback is called when a matching reply arrives. If it returns matched == true, the
|
||||||
// true, the callback is removed from the pending reply queue.
|
// reply was acceptable. The second return value indicates whether the callback should
|
||||||
// if it returns false, the reply is considered incomplete and
|
// be removed from the pending reply queue. If it returns false, the reply is considered
|
||||||
// the callback will be invoked again for the next matching reply.
|
// incomplete and the callback will be invoked again for the next matching reply.
|
||||||
callback func(resp interface{}) (done bool)
|
callback replyMatchFunc
|
||||||
|
|
||||||
// errc receives nil when the callback indicates completion or an
|
// errc receives nil when the callback indicates completion or an
|
||||||
// error if no further reply is received within the timeout.
|
// error if no further reply is received within the timeout.
|
||||||
errc chan<- error
|
errc chan<- error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type replyMatchFunc func(interface{}) (matched bool, requestDone bool)
|
||||||
|
|
||||||
type reply struct {
|
type reply struct {
|
||||||
from enode.ID
|
from enode.ID
|
||||||
|
ip net.IP
|
||||||
ptype byte
|
ptype byte
|
||||||
data interface{}
|
data packet
|
||||||
|
|
||||||
// loop indicates whether there was
|
// loop indicates whether there was
|
||||||
// a matching request by sending on this channel.
|
// a matching request by sending on this channel.
|
||||||
matched chan<- bool
|
matched chan<- bool
|
||||||
@ -254,7 +266,7 @@ func newUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, *udp, error) {
|
|||||||
db: ln.Database(),
|
db: ln.Database(),
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
gotreply: make(chan reply),
|
gotreply: make(chan reply),
|
||||||
addpending: make(chan *pending),
|
addReplyMatcher: make(chan *replyMatcher),
|
||||||
}
|
}
|
||||||
tab, err := newTable(udp, ln.Database(), cfg.Bootnodes)
|
tab, err := newTable(udp, ln.Database(), cfg.Bootnodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -304,35 +316,37 @@ func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-ch
|
|||||||
errc <- err
|
errc <- err
|
||||||
return errc
|
return errc
|
||||||
}
|
}
|
||||||
errc := t.pending(toid, pongPacket, func(p interface{}) bool {
|
// Add a matcher for the reply to the pending reply queue. Pongs are matched if they
|
||||||
ok := bytes.Equal(p.(*pong).ReplyTok, hash)
|
// reference the ping we're about to send.
|
||||||
if ok && callback != nil {
|
errc := t.pending(toid, toaddr.IP, pongPacket, func(p interface{}) (matched bool, requestDone bool) {
|
||||||
|
matched = bytes.Equal(p.(*pong).ReplyTok, hash)
|
||||||
|
if matched && callback != nil {
|
||||||
callback()
|
callback()
|
||||||
}
|
}
|
||||||
return ok
|
return matched, matched
|
||||||
})
|
})
|
||||||
|
// Send the packet.
|
||||||
t.localNode.UDPContact(toaddr)
|
t.localNode.UDPContact(toaddr)
|
||||||
t.write(toaddr, req.name(), packet)
|
t.write(toaddr, toid, req.name(), packet)
|
||||||
return errc
|
return errc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *udp) waitping(from enode.ID) error {
|
|
||||||
return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
|
|
||||||
}
|
|
||||||
|
|
||||||
// findnode sends a findnode request to the given node and waits until
|
// findnode sends a findnode request to the given node and waits until
|
||||||
// the node has sent up to k neighbors.
|
// the node has sent up to k neighbors.
|
||||||
func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) {
|
func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) {
|
||||||
// If we haven't seen a ping from the destination node for a while, it won't remember
|
// If we haven't seen a ping from the destination node for a while, it won't remember
|
||||||
// our endpoint proof and reject findnode. Solicit a ping first.
|
// our endpoint proof and reject findnode. Solicit a ping first.
|
||||||
if time.Since(t.db.LastPingReceived(toid)) > bondExpiration {
|
if time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration {
|
||||||
t.ping(toid, toaddr)
|
t.ping(toid, toaddr)
|
||||||
t.waitping(toid)
|
// Wait for them to ping back and process our pong.
|
||||||
|
time.Sleep(respTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
|
||||||
|
// active until enough nodes have been received.
|
||||||
nodes := make([]*node, 0, bucketSize)
|
nodes := make([]*node, 0, bucketSize)
|
||||||
nreceived := 0
|
nreceived := 0
|
||||||
errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
|
errc := t.pending(toid, toaddr.IP, neighborsPacket, func(r interface{}) (matched bool, requestDone bool) {
|
||||||
reply := r.(*neighbors)
|
reply := r.(*neighbors)
|
||||||
for _, rn := range reply.Nodes {
|
for _, rn := range reply.Nodes {
|
||||||
nreceived++
|
nreceived++
|
||||||
@ -343,22 +357,22 @@ func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]
|
|||||||
}
|
}
|
||||||
nodes = append(nodes, n)
|
nodes = append(nodes, n)
|
||||||
}
|
}
|
||||||
return nreceived >= bucketSize
|
return true, nreceived >= bucketSize
|
||||||
})
|
})
|
||||||
t.send(toaddr, findnodePacket, &findnode{
|
t.send(toaddr, toid, findnodePacket, &findnode{
|
||||||
Target: target,
|
Target: target,
|
||||||
Expiration: uint64(time.Now().Add(expiration).Unix()),
|
Expiration: uint64(time.Now().Add(expiration).Unix()),
|
||||||
})
|
})
|
||||||
return nodes, <-errc
|
return nodes, <-errc
|
||||||
}
|
}
|
||||||
|
|
||||||
// pending adds a reply callback to the pending reply queue.
|
// pending adds a reply matcher to the pending reply queue.
|
||||||
// see the documentation of type pending for a detailed explanation.
|
// see the documentation of type replyMatcher for a detailed explanation.
|
||||||
func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool) <-chan error {
|
func (t *udp) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) <-chan error {
|
||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
|
p := &replyMatcher{from: id, ip: ip, ptype: ptype, callback: callback, errc: ch}
|
||||||
select {
|
select {
|
||||||
case t.addpending <- p:
|
case t.addReplyMatcher <- p:
|
||||||
// loop will handle it
|
// loop will handle it
|
||||||
case <-t.closing:
|
case <-t.closing:
|
||||||
ch <- errClosed
|
ch <- errClosed
|
||||||
@ -366,10 +380,12 @@ func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool)
|
|||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool {
|
// handleReply dispatches a reply packet, invoking reply matchers. It returns
|
||||||
|
// whether any matcher considered the packet acceptable.
|
||||||
|
func (t *udp) handleReply(from enode.ID, fromIP net.IP, ptype byte, req packet) bool {
|
||||||
matched := make(chan bool, 1)
|
matched := make(chan bool, 1)
|
||||||
select {
|
select {
|
||||||
case t.gotreply <- reply{from, ptype, req, matched}:
|
case t.gotreply <- reply{from, fromIP, ptype, req, matched}:
|
||||||
// loop will handle it
|
// loop will handle it
|
||||||
return <-matched
|
return <-matched
|
||||||
case <-t.closing:
|
case <-t.closing:
|
||||||
@ -385,7 +401,7 @@ func (t *udp) loop() {
|
|||||||
var (
|
var (
|
||||||
plist = list.New()
|
plist = list.New()
|
||||||
timeout = time.NewTimer(0)
|
timeout = time.NewTimer(0)
|
||||||
nextTimeout *pending // head of plist when timeout was last reset
|
nextTimeout *replyMatcher // head of plist when timeout was last reset
|
||||||
contTimeouts = 0 // number of continuous timeouts to do NTP checks
|
contTimeouts = 0 // number of continuous timeouts to do NTP checks
|
||||||
ntpWarnTime = time.Unix(0, 0)
|
ntpWarnTime = time.Unix(0, 0)
|
||||||
)
|
)
|
||||||
@ -399,7 +415,7 @@ func (t *udp) loop() {
|
|||||||
// Start the timer so it fires when the next pending reply has expired.
|
// Start the timer so it fires when the next pending reply has expired.
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for el := plist.Front(); el != nil; el = el.Next() {
|
for el := plist.Front(); el != nil; el = el.Next() {
|
||||||
nextTimeout = el.Value.(*pending)
|
nextTimeout = el.Value.(*replyMatcher)
|
||||||
if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
|
if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
|
||||||
timeout.Reset(dist)
|
timeout.Reset(dist)
|
||||||
return
|
return
|
||||||
@ -420,25 +436,23 @@ func (t *udp) loop() {
|
|||||||
select {
|
select {
|
||||||
case <-t.closing:
|
case <-t.closing:
|
||||||
for el := plist.Front(); el != nil; el = el.Next() {
|
for el := plist.Front(); el != nil; el = el.Next() {
|
||||||
el.Value.(*pending).errc <- errClosed
|
el.Value.(*replyMatcher).errc <- errClosed
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
||||||
case p := <-t.addpending:
|
case p := <-t.addReplyMatcher:
|
||||||
p.deadline = time.Now().Add(respTimeout)
|
p.deadline = time.Now().Add(respTimeout)
|
||||||
plist.PushBack(p)
|
plist.PushBack(p)
|
||||||
|
|
||||||
case r := <-t.gotreply:
|
case r := <-t.gotreply:
|
||||||
var matched bool
|
var matched bool // whether any replyMatcher considered the reply acceptable.
|
||||||
for el := plist.Front(); el != nil; el = el.Next() {
|
for el := plist.Front(); el != nil; el = el.Next() {
|
||||||
p := el.Value.(*pending)
|
p := el.Value.(*replyMatcher)
|
||||||
if p.from == r.from && p.ptype == r.ptype {
|
if p.from == r.from && p.ptype == r.ptype && p.ip.Equal(r.ip) {
|
||||||
matched = true
|
ok, requestDone := p.callback(r.data)
|
||||||
// Remove the matcher if its callback indicates
|
matched = matched || ok
|
||||||
// that all replies have been received. This is
|
// Remove the matcher if callback indicates that all replies have been received.
|
||||||
// required for packet types that expect multiple
|
if requestDone {
|
||||||
// reply packets.
|
|
||||||
if p.callback(r.data) {
|
|
||||||
p.errc <- nil
|
p.errc <- nil
|
||||||
plist.Remove(el)
|
plist.Remove(el)
|
||||||
}
|
}
|
||||||
@ -453,7 +467,7 @@ func (t *udp) loop() {
|
|||||||
|
|
||||||
// Notify and remove callbacks whose deadline is in the past.
|
// Notify and remove callbacks whose deadline is in the past.
|
||||||
for el := plist.Front(); el != nil; el = el.Next() {
|
for el := plist.Front(); el != nil; el = el.Next() {
|
||||||
p := el.Value.(*pending)
|
p := el.Value.(*replyMatcher)
|
||||||
if now.After(p.deadline) || now.Equal(p.deadline) {
|
if now.After(p.deadline) || now.Equal(p.deadline) {
|
||||||
p.errc <- errTimeout
|
p.errc <- errTimeout
|
||||||
plist.Remove(el)
|
plist.Remove(el)
|
||||||
@ -504,17 +518,17 @@ func init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
|
func (t *udp) send(toaddr *net.UDPAddr, toid enode.ID, ptype byte, req packet) ([]byte, error) {
|
||||||
packet, hash, err := encodePacket(t.priv, ptype, req)
|
packet, hash, err := encodePacket(t.priv, ptype, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return hash, err
|
return hash, err
|
||||||
}
|
}
|
||||||
return hash, t.write(toaddr, req.name(), packet)
|
return hash, t.write(toaddr, toid, req.name(), packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *udp) write(toaddr *net.UDPAddr, what string, packet []byte) error {
|
func (t *udp) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet []byte) error {
|
||||||
_, err := t.conn.WriteToUDP(packet, toaddr)
|
_, err := t.conn.WriteToUDP(packet, toaddr)
|
||||||
log.Trace(">> "+what, "addr", toaddr, "err", err)
|
log.Trace(">> "+what, "id", toid, "addr", toaddr, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -573,13 +587,19 @@ func (t *udp) readLoop(unhandled chan<- ReadPacket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
|
func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
|
||||||
packet, fromID, hash, err := decodePacket(buf)
|
packet, fromKey, hash, err := decodePacket(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("Bad discv4 packet", "addr", from, "err", err)
|
log.Debug("Bad discv4 packet", "addr", from, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = packet.handle(t, from, fromID, hash)
|
fromID := fromKey.id()
|
||||||
log.Trace("<< "+packet.name(), "addr", from, "err", err)
|
if err == nil {
|
||||||
|
err = packet.preverify(t, from, fromID, fromKey)
|
||||||
|
}
|
||||||
|
log.Trace("<< "+packet.name(), "id", fromID, "addr", from, "err", err)
|
||||||
|
if err == nil {
|
||||||
|
packet.handle(t, from, fromID, hash)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -615,54 +635,67 @@ func decodePacket(buf []byte) (packet, encPubkey, []byte, error) {
|
|||||||
return req, fromKey, hash, err
|
return req, fromKey, hash, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
|
// Packet Handlers
|
||||||
|
|
||||||
|
func (req *ping) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
|
||||||
if expired(req.Expiration) {
|
if expired(req.Expiration) {
|
||||||
return errExpired
|
return errExpired
|
||||||
}
|
}
|
||||||
key, err := decodePubkey(fromKey)
|
key, err := decodePubkey(fromKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid public key: %v", err)
|
return errors.New("invalid public key")
|
||||||
}
|
}
|
||||||
t.send(from, pongPacket, &pong{
|
req.senderKey = key
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
|
||||||
|
// Reply.
|
||||||
|
t.send(from, fromID, pongPacket, &pong{
|
||||||
To: makeEndpoint(from, req.From.TCP),
|
To: makeEndpoint(from, req.From.TCP),
|
||||||
ReplyTok: mac,
|
ReplyTok: mac,
|
||||||
Expiration: uint64(time.Now().Add(expiration).Unix()),
|
Expiration: uint64(time.Now().Add(expiration).Unix()),
|
||||||
})
|
})
|
||||||
n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port))
|
|
||||||
t.handleReply(n.ID(), pingPacket, req)
|
// Ping back if our last pong on file is too far in the past.
|
||||||
if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration {
|
n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port))
|
||||||
t.sendPing(n.ID(), from, func() { t.tab.addThroughPing(n) })
|
if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
|
||||||
|
t.sendPing(fromID, from, func() {
|
||||||
|
t.tab.addThroughPing(n)
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
t.tab.addThroughPing(n)
|
t.tab.addThroughPing(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update node database and endpoint predictor.
|
||||||
|
t.db.UpdateLastPingReceived(n.ID(), from.IP, time.Now())
|
||||||
t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
|
t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
|
||||||
t.db.UpdateLastPingReceived(n.ID(), time.Now())
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *ping) name() string { return "PING/v4" }
|
func (req *ping) name() string { return "PING/v4" }
|
||||||
|
|
||||||
func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
|
func (req *pong) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
|
||||||
if expired(req.Expiration) {
|
if expired(req.Expiration) {
|
||||||
return errExpired
|
return errExpired
|
||||||
}
|
}
|
||||||
fromID := fromKey.id()
|
if !t.handleReply(fromID, from.IP, pongPacket, req) {
|
||||||
if !t.handleReply(fromID, pongPacket, req) {
|
|
||||||
return errUnsolicitedReply
|
return errUnsolicitedReply
|
||||||
}
|
}
|
||||||
t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
|
|
||||||
t.db.UpdateLastPongReceived(fromID, time.Now())
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (req *pong) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
|
||||||
|
t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
|
||||||
|
t.db.UpdateLastPongReceived(fromID, from.IP, time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
func (req *pong) name() string { return "PONG/v4" }
|
func (req *pong) name() string { return "PONG/v4" }
|
||||||
|
|
||||||
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
|
func (req *findnode) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
|
||||||
if expired(req.Expiration) {
|
if expired(req.Expiration) {
|
||||||
return errExpired
|
return errExpired
|
||||||
}
|
}
|
||||||
fromID := fromKey.id()
|
if time.Since(t.db.LastPongReceived(fromID, from.IP)) > bondExpiration {
|
||||||
if time.Since(t.db.LastPongReceived(fromID)) > bondExpiration {
|
|
||||||
// No endpoint proof pong exists, we don't process the packet. This prevents an
|
// No endpoint proof pong exists, we don't process the packet. This prevents an
|
||||||
// attack vector where the discovery protocol could be used to amplify traffic in a
|
// attack vector where the discovery protocol could be used to amplify traffic in a
|
||||||
// DDOS attack. A malicious actor would send a findnode request with the IP address
|
// DDOS attack. A malicious actor would send a findnode request with the IP address
|
||||||
@ -671,43 +704,50 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []
|
|||||||
// findnode) to the victim.
|
// findnode) to the victim.
|
||||||
return errUnknownNode
|
return errUnknownNode
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
|
||||||
|
// Determine closest nodes.
|
||||||
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
|
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
|
||||||
t.tab.mutex.Lock()
|
t.tab.mutex.Lock()
|
||||||
closest := t.tab.closest(target, bucketSize).entries
|
closest := t.tab.closest(target, bucketSize).entries
|
||||||
t.tab.mutex.Unlock()
|
t.tab.mutex.Unlock()
|
||||||
|
|
||||||
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
|
|
||||||
var sent bool
|
|
||||||
// Send neighbors in chunks with at most maxNeighbors per packet
|
// Send neighbors in chunks with at most maxNeighbors per packet
|
||||||
// to stay below the 1280 byte limit.
|
// to stay below the 1280 byte limit.
|
||||||
|
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
|
||||||
|
var sent bool
|
||||||
for _, n := range closest {
|
for _, n := range closest {
|
||||||
if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
|
if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
|
||||||
p.Nodes = append(p.Nodes, nodeToRPC(n))
|
p.Nodes = append(p.Nodes, nodeToRPC(n))
|
||||||
}
|
}
|
||||||
if len(p.Nodes) == maxNeighbors {
|
if len(p.Nodes) == maxNeighbors {
|
||||||
t.send(from, neighborsPacket, &p)
|
t.send(from, fromID, neighborsPacket, &p)
|
||||||
p.Nodes = p.Nodes[:0]
|
p.Nodes = p.Nodes[:0]
|
||||||
sent = true
|
sent = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(p.Nodes) > 0 || !sent {
|
if len(p.Nodes) > 0 || !sent {
|
||||||
t.send(from, neighborsPacket, &p)
|
t.send(from, fromID, neighborsPacket, &p)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (req *findnode) name() string { return "FINDNODE/v4" }
|
func (req *findnode) name() string { return "FINDNODE/v4" }
|
||||||
|
|
||||||
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
|
func (req *neighbors) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
|
||||||
if expired(req.Expiration) {
|
if expired(req.Expiration) {
|
||||||
return errExpired
|
return errExpired
|
||||||
}
|
}
|
||||||
if !t.handleReply(fromKey.id(), neighborsPacket, req) {
|
if !t.handleReply(fromID, from.IP, neighborsPacket, req) {
|
||||||
return errUnsolicitedReply
|
return errUnsolicitedReply
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
|
||||||
|
}
|
||||||
|
|
||||||
func (req *neighbors) name() string { return "NEIGHBORS/v4" }
|
func (req *neighbors) name() string { return "NEIGHBORS/v4" }
|
||||||
|
|
||||||
func expired(ts uint64) bool {
|
func expired(ts uint64) bool {
|
||||||
|
@ -19,6 +19,7 @@ package discover
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
crand "crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
@ -57,6 +58,7 @@ type udpTest struct {
|
|||||||
t *testing.T
|
t *testing.T
|
||||||
pipe *dgramPipe
|
pipe *dgramPipe
|
||||||
table *Table
|
table *Table
|
||||||
|
db *enode.DB
|
||||||
udp *udp
|
udp *udp
|
||||||
sent [][]byte
|
sent [][]byte
|
||||||
localkey, remotekey *ecdsa.PrivateKey
|
localkey, remotekey *ecdsa.PrivateKey
|
||||||
@ -71,22 +73,32 @@ func newUDPTest(t *testing.T) *udpTest {
|
|||||||
remotekey: newkey(),
|
remotekey: newkey(),
|
||||||
remoteaddr: &net.UDPAddr{IP: net.IP{10, 0, 1, 99}, Port: 30303},
|
remoteaddr: &net.UDPAddr{IP: net.IP{10, 0, 1, 99}, Port: 30303},
|
||||||
}
|
}
|
||||||
db, _ := enode.OpenDB("")
|
test.db, _ = enode.OpenDB("")
|
||||||
ln := enode.NewLocalNode(db, test.localkey)
|
ln := enode.NewLocalNode(test.db, test.localkey)
|
||||||
test.table, test.udp, _ = newUDP(test.pipe, ln, Config{PrivateKey: test.localkey})
|
test.table, test.udp, _ = newUDP(test.pipe, ln, Config{PrivateKey: test.localkey})
|
||||||
// Wait for initial refresh so the table doesn't send unexpected findnode.
|
// Wait for initial refresh so the table doesn't send unexpected findnode.
|
||||||
<-test.table.initDone
|
<-test.table.initDone
|
||||||
return test
|
return test
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (test *udpTest) close() {
|
||||||
|
test.table.Close()
|
||||||
|
test.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// handles a packet as if it had been sent to the transport.
|
// handles a packet as if it had been sent to the transport.
|
||||||
func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
|
func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
|
||||||
enc, _, err := encodePacket(test.remotekey, ptype, data)
|
return test.packetInFrom(wantError, test.remotekey, test.remoteaddr, ptype, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handles a packet as if it had been sent to the transport by the key/endpoint.
|
||||||
|
func (test *udpTest) packetInFrom(wantError error, key *ecdsa.PrivateKey, addr *net.UDPAddr, ptype byte, data packet) error {
|
||||||
|
enc, _, err := encodePacket(key, ptype, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return test.errorf("packet (%d) encode error: %v", ptype, err)
|
return test.errorf("packet (%d) encode error: %v", ptype, err)
|
||||||
}
|
}
|
||||||
test.sent = append(test.sent, enc)
|
test.sent = append(test.sent, enc)
|
||||||
if err = test.udp.handlePacket(test.remoteaddr, enc); err != wantError {
|
if err = test.udp.handlePacket(addr, enc); err != wantError {
|
||||||
return test.errorf("error mismatch: got %q, want %q", err, wantError)
|
return test.errorf("error mismatch: got %q, want %q", err, wantError)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -94,19 +106,19 @@ func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
|
|||||||
|
|
||||||
// waits for a packet to be sent by the transport.
|
// waits for a packet to be sent by the transport.
|
||||||
// validate should have type func(*udpTest, X) error, where X is a packet type.
|
// validate should have type func(*udpTest, X) error, where X is a packet type.
|
||||||
func (test *udpTest) waitPacketOut(validate interface{}) ([]byte, error) {
|
func (test *udpTest) waitPacketOut(validate interface{}) (*net.UDPAddr, []byte, error) {
|
||||||
dgram := test.pipe.waitPacketOut()
|
dgram := test.pipe.waitPacketOut()
|
||||||
p, _, hash, err := decodePacket(dgram)
|
p, _, hash, err := decodePacket(dgram.data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return hash, test.errorf("sent packet decode error: %v", err)
|
return &dgram.to, hash, test.errorf("sent packet decode error: %v", err)
|
||||||
}
|
}
|
||||||
fn := reflect.ValueOf(validate)
|
fn := reflect.ValueOf(validate)
|
||||||
exptype := fn.Type().In(0)
|
exptype := fn.Type().In(0)
|
||||||
if reflect.TypeOf(p) != exptype {
|
if reflect.TypeOf(p) != exptype {
|
||||||
return hash, test.errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
|
return &dgram.to, hash, test.errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
|
||||||
}
|
}
|
||||||
fn.Call([]reflect.Value{reflect.ValueOf(p)})
|
fn.Call([]reflect.Value{reflect.ValueOf(p)})
|
||||||
return hash, nil
|
return &dgram.to, hash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (test *udpTest) errorf(format string, args ...interface{}) error {
|
func (test *udpTest) errorf(format string, args ...interface{}) error {
|
||||||
@ -125,7 +137,7 @@ func (test *udpTest) errorf(format string, args ...interface{}) error {
|
|||||||
|
|
||||||
func TestUDP_packetErrors(t *testing.T) {
|
func TestUDP_packetErrors(t *testing.T) {
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4})
|
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4})
|
||||||
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
|
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
|
||||||
@ -136,7 +148,7 @@ func TestUDP_packetErrors(t *testing.T) {
|
|||||||
func TestUDP_pingTimeout(t *testing.T) {
|
func TestUDP_pingTimeout(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
|
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
|
||||||
toid := enode.ID{1, 2, 3, 4}
|
toid := enode.ID{1, 2, 3, 4}
|
||||||
@ -148,7 +160,7 @@ func TestUDP_pingTimeout(t *testing.T) {
|
|||||||
func TestUDP_responseTimeouts(t *testing.T) {
|
func TestUDP_responseTimeouts(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
randomDuration := func(max time.Duration) time.Duration {
|
randomDuration := func(max time.Duration) time.Duration {
|
||||||
@ -166,20 +178,20 @@ func TestUDP_responseTimeouts(t *testing.T) {
|
|||||||
// with ptype <= 128 will not get a reply and should time out.
|
// with ptype <= 128 will not get a reply and should time out.
|
||||||
// For all other requests, a reply is scheduled to arrive
|
// For all other requests, a reply is scheduled to arrive
|
||||||
// within the timeout window.
|
// within the timeout window.
|
||||||
p := &pending{
|
p := &replyMatcher{
|
||||||
ptype: byte(rand.Intn(255)),
|
ptype: byte(rand.Intn(255)),
|
||||||
callback: func(interface{}) bool { return true },
|
callback: func(interface{}) (bool, bool) { return true, true },
|
||||||
}
|
}
|
||||||
binary.BigEndian.PutUint64(p.from[:], uint64(i))
|
binary.BigEndian.PutUint64(p.from[:], uint64(i))
|
||||||
if p.ptype <= 128 {
|
if p.ptype <= 128 {
|
||||||
p.errc = timeoutErr
|
p.errc = timeoutErr
|
||||||
test.udp.addpending <- p
|
test.udp.addReplyMatcher <- p
|
||||||
nTimeouts++
|
nTimeouts++
|
||||||
} else {
|
} else {
|
||||||
p.errc = nilErr
|
p.errc = nilErr
|
||||||
test.udp.addpending <- p
|
test.udp.addReplyMatcher <- p
|
||||||
time.AfterFunc(randomDuration(60*time.Millisecond), func() {
|
time.AfterFunc(randomDuration(60*time.Millisecond), func() {
|
||||||
if !test.udp.handleReply(p.from, p.ptype, nil) {
|
if !test.udp.handleReply(p.from, p.ip, p.ptype, nil) {
|
||||||
t.Logf("not matched: %v", p)
|
t.Logf("not matched: %v", p)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -220,7 +232,7 @@ func TestUDP_responseTimeouts(t *testing.T) {
|
|||||||
func TestUDP_findnodeTimeout(t *testing.T) {
|
func TestUDP_findnodeTimeout(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
|
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
|
||||||
toid := enode.ID{1, 2, 3, 4}
|
toid := enode.ID{1, 2, 3, 4}
|
||||||
@ -236,50 +248,65 @@ func TestUDP_findnodeTimeout(t *testing.T) {
|
|||||||
|
|
||||||
func TestUDP_findnode(t *testing.T) {
|
func TestUDP_findnode(t *testing.T) {
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
// put a few nodes into the table. their exact
|
// put a few nodes into the table. their exact
|
||||||
// distribution shouldn't matter much, although we need to
|
// distribution shouldn't matter much, although we need to
|
||||||
// take care not to overflow any bucket.
|
// take care not to overflow any bucket.
|
||||||
nodes := &nodesByDistance{target: testTarget.id()}
|
nodes := &nodesByDistance{target: testTarget.id()}
|
||||||
for i := 0; i < bucketSize; i++ {
|
live := make(map[enode.ID]bool)
|
||||||
|
numCandidates := 2 * bucketSize
|
||||||
|
for i := 0; i < numCandidates; i++ {
|
||||||
key := newkey()
|
key := newkey()
|
||||||
n := wrapNode(enode.NewV4(&key.PublicKey, net.IP{10, 13, 0, 1}, 0, i))
|
ip := net.IP{10, 13, 0, byte(i)}
|
||||||
nodes.push(n, bucketSize)
|
n := wrapNode(enode.NewV4(&key.PublicKey, ip, 0, 2000))
|
||||||
|
// Ensure half of table content isn't verified live yet.
|
||||||
|
if i > numCandidates/2 {
|
||||||
|
n.livenessChecks = 1
|
||||||
|
live[n.ID()] = true
|
||||||
}
|
}
|
||||||
test.table.stuff(nodes.entries)
|
nodes.push(n, numCandidates)
|
||||||
|
}
|
||||||
|
fillTable(test.table, nodes.entries)
|
||||||
|
|
||||||
// ensure there's a bond with the test node,
|
// ensure there's a bond with the test node,
|
||||||
// findnode won't be accepted otherwise.
|
// findnode won't be accepted otherwise.
|
||||||
remoteID := encodePubkey(&test.remotekey.PublicKey).id()
|
remoteID := encodePubkey(&test.remotekey.PublicKey).id()
|
||||||
test.table.db.UpdateLastPongReceived(remoteID, time.Now())
|
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
|
||||||
|
|
||||||
// check that closest neighbors are returned.
|
// check that closest neighbors are returned.
|
||||||
test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
|
|
||||||
expected := test.table.closest(testTarget.id(), bucketSize)
|
expected := test.table.closest(testTarget.id(), bucketSize)
|
||||||
|
test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
|
||||||
waitNeighbors := func(want []*node) {
|
waitNeighbors := func(want []*node) {
|
||||||
test.waitPacketOut(func(p *neighbors) {
|
test.waitPacketOut(func(p *neighbors) {
|
||||||
if len(p.Nodes) != len(want) {
|
if len(p.Nodes) != len(want) {
|
||||||
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
|
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
|
||||||
}
|
}
|
||||||
for i := range p.Nodes {
|
for i, n := range p.Nodes {
|
||||||
if p.Nodes[i].ID.id() != want[i].ID() {
|
if n.ID.id() != want[i].ID() {
|
||||||
t.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, p.Nodes[i], expected.entries[i])
|
t.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, n, expected.entries[i])
|
||||||
|
}
|
||||||
|
if !live[n.ID.id()] {
|
||||||
|
t.Errorf("result includes dead node %v", n.ID.id())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
waitNeighbors(expected.entries[:maxNeighbors])
|
// Receive replies.
|
||||||
waitNeighbors(expected.entries[maxNeighbors:])
|
want := expected.entries
|
||||||
|
if len(want) > maxNeighbors {
|
||||||
|
waitNeighbors(want[:maxNeighbors])
|
||||||
|
want = want[maxNeighbors:]
|
||||||
|
}
|
||||||
|
waitNeighbors(want)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUDP_findnodeMultiReply(t *testing.T) {
|
func TestUDP_findnodeMultiReply(t *testing.T) {
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey)
|
rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey)
|
||||||
test.table.db.UpdateLastPingReceived(rid, time.Now())
|
test.table.db.UpdateLastPingReceived(rid, test.remoteaddr.IP, time.Now())
|
||||||
|
|
||||||
// queue a pending findnode request
|
// queue a pending findnode request
|
||||||
resultc, errc := make(chan []*node), make(chan error)
|
resultc, errc := make(chan []*node), make(chan error)
|
||||||
@ -329,11 +356,40 @@ func TestUDP_findnodeMultiReply(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUDP_pingMatch(t *testing.T) {
|
||||||
|
test := newUDPTest(t)
|
||||||
|
defer test.close()
|
||||||
|
|
||||||
|
randToken := make([]byte, 32)
|
||||||
|
crand.Read(randToken)
|
||||||
|
|
||||||
|
test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
|
||||||
|
test.waitPacketOut(func(*pong) error { return nil })
|
||||||
|
test.waitPacketOut(func(*ping) error { return nil })
|
||||||
|
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: randToken, To: testLocalAnnounced, Expiration: futureExp})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUDP_pingMatchIP(t *testing.T) {
|
||||||
|
test := newUDPTest(t)
|
||||||
|
defer test.close()
|
||||||
|
|
||||||
|
test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
|
||||||
|
test.waitPacketOut(func(*pong) error { return nil })
|
||||||
|
|
||||||
|
_, hash, _ := test.waitPacketOut(func(*ping) error { return nil })
|
||||||
|
wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 1, 2}, Port: 30000}
|
||||||
|
test.packetInFrom(errUnsolicitedReply, test.remotekey, wrongAddr, pongPacket, &pong{
|
||||||
|
ReplyTok: hash,
|
||||||
|
To: testLocalAnnounced,
|
||||||
|
Expiration: futureExp,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestUDP_successfulPing(t *testing.T) {
|
func TestUDP_successfulPing(t *testing.T) {
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
added := make(chan *node, 1)
|
added := make(chan *node, 1)
|
||||||
test.table.nodeAddedHook = func(n *node) { added <- n }
|
test.table.nodeAddedHook = func(n *node) { added <- n }
|
||||||
defer test.table.Close()
|
defer test.close()
|
||||||
|
|
||||||
// The remote side sends a ping packet to initiate the exchange.
|
// The remote side sends a ping packet to initiate the exchange.
|
||||||
go test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
|
go test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
|
||||||
@ -356,7 +412,7 @@ func TestUDP_successfulPing(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// remote is unknown, the table pings back.
|
// remote is unknown, the table pings back.
|
||||||
hash, _ := test.waitPacketOut(func(p *ping) error {
|
_, hash, _ := test.waitPacketOut(func(p *ping) error {
|
||||||
if !reflect.DeepEqual(p.From, test.udp.ourEndpoint()) {
|
if !reflect.DeepEqual(p.From, test.udp.ourEndpoint()) {
|
||||||
t.Errorf("got ping.From %#v, want %#v", p.From, test.udp.ourEndpoint())
|
t.Errorf("got ping.From %#v, want %#v", p.From, test.udp.ourEndpoint())
|
||||||
}
|
}
|
||||||
@ -510,7 +566,12 @@ type dgramPipe struct {
|
|||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
closed bool
|
closed bool
|
||||||
queue [][]byte
|
queue []dgram
|
||||||
|
}
|
||||||
|
|
||||||
|
type dgram struct {
|
||||||
|
to net.UDPAddr
|
||||||
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func newpipe() *dgramPipe {
|
func newpipe() *dgramPipe {
|
||||||
@ -531,7 +592,7 @@ func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) {
|
|||||||
if c.closed {
|
if c.closed {
|
||||||
return 0, errors.New("closed")
|
return 0, errors.New("closed")
|
||||||
}
|
}
|
||||||
c.queue = append(c.queue, msg)
|
c.queue = append(c.queue, dgram{*to, b})
|
||||||
c.cond.Signal()
|
c.cond.Signal()
|
||||||
return len(b), nil
|
return len(b), nil
|
||||||
}
|
}
|
||||||
@ -556,7 +617,7 @@ func (c *dgramPipe) LocalAddr() net.Addr {
|
|||||||
return &net.UDPAddr{IP: testLocal.IP, Port: int(testLocal.UDP)}
|
return &net.UDPAddr{IP: testLocal.IP, Port: int(testLocal.UDP)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *dgramPipe) waitPacketOut() []byte {
|
func (c *dgramPipe) waitPacketOut() dgram {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
for len(c.queue) == 0 {
|
for len(c.queue) == 0 {
|
||||||
|
@ -21,11 +21,11 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||||
@ -38,23 +38,30 @@ import (
|
|||||||
// Keys in the node database.
|
// Keys in the node database.
|
||||||
const (
|
const (
|
||||||
dbVersionKey = "version" // Version of the database to flush if changes
|
dbVersionKey = "version" // Version of the database to flush if changes
|
||||||
dbItemPrefix = "n:" // Identifier to prefix node entries with
|
dbNodePrefix = "n:" // Identifier to prefix node entries with
|
||||||
|
dbLocalPrefix = "local:"
|
||||||
|
dbDiscoverRoot = "v4"
|
||||||
|
|
||||||
dbDiscoverRoot = ":discover"
|
// These fields are stored per ID and IP, the full key is "n:<ID>:v4:<IP>:findfail".
|
||||||
dbDiscoverSeq = dbDiscoverRoot + ":seq"
|
// Use nodeItemKey to create those keys.
|
||||||
dbDiscoverPing = dbDiscoverRoot + ":lastping"
|
dbNodeFindFails = "findfail"
|
||||||
dbDiscoverPong = dbDiscoverRoot + ":lastpong"
|
dbNodePing = "lastping"
|
||||||
dbDiscoverFindFails = dbDiscoverRoot + ":findfail"
|
dbNodePong = "lastpong"
|
||||||
dbLocalRoot = ":local"
|
dbNodeSeq = "seq"
|
||||||
dbLocalSeq = dbLocalRoot + ":seq"
|
|
||||||
|
// Local information is keyed by ID only, the full key is "local:<ID>:seq".
|
||||||
|
// Use localItemKey to create those keys.
|
||||||
|
dbLocalSeq = "seq"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
const (
|
||||||
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
|
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
|
||||||
dbCleanupCycle = time.Hour // Time period for running the expiration task.
|
dbCleanupCycle = time.Hour // Time period for running the expiration task.
|
||||||
dbVersion = 7
|
dbVersion = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var zeroIP = make(net.IP, 16)
|
||||||
|
|
||||||
// DB is the node database, storing previously seen nodes and any collected metadata about
|
// DB is the node database, storing previously seen nodes and any collected metadata about
|
||||||
// them for QoS purposes.
|
// them for QoS purposes.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
@ -119,27 +126,58 @@ func newPersistentDB(path string) (*DB, error) {
|
|||||||
return &DB{lvl: db, quit: make(chan struct{})}, nil
|
return &DB{lvl: db, quit: make(chan struct{})}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeKey generates the leveldb key-blob from a node id and its particular
|
// nodeKey returns the database key for a node record.
|
||||||
// field of interest.
|
func nodeKey(id ID) []byte {
|
||||||
func makeKey(id ID, field string) []byte {
|
key := append([]byte(dbNodePrefix), id[:]...)
|
||||||
if (id == ID{}) {
|
key = append(key, ':')
|
||||||
return []byte(field)
|
key = append(key, dbDiscoverRoot...)
|
||||||
}
|
return key
|
||||||
return append([]byte(dbItemPrefix), append(id[:], field...)...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// splitKey tries to split a database key into a node id and a field part.
|
// splitNodeKey returns the node ID of a key created by nodeKey.
|
||||||
func splitKey(key []byte) (id ID, field string) {
|
func splitNodeKey(key []byte) (id ID, rest []byte) {
|
||||||
// If the key is not of a node, return it plainly
|
if !bytes.HasPrefix(key, []byte(dbNodePrefix)) {
|
||||||
if !bytes.HasPrefix(key, []byte(dbItemPrefix)) {
|
return ID{}, nil
|
||||||
return ID{}, string(key)
|
|
||||||
}
|
}
|
||||||
// Otherwise split the id and field
|
item := key[len(dbNodePrefix):]
|
||||||
item := key[len(dbItemPrefix):]
|
|
||||||
copy(id[:], item[:len(id)])
|
copy(id[:], item[:len(id)])
|
||||||
field = string(item[len(id):])
|
return id, item[len(id)+1:]
|
||||||
|
}
|
||||||
|
|
||||||
return id, field
|
// nodeItemKey returns the database key for a node metadata field.
|
||||||
|
func nodeItemKey(id ID, ip net.IP, field string) []byte {
|
||||||
|
ip16 := ip.To16()
|
||||||
|
if ip16 == nil {
|
||||||
|
panic(fmt.Errorf("invalid IP (length %d)", len(ip)))
|
||||||
|
}
|
||||||
|
return bytes.Join([][]byte{nodeKey(id), ip16, []byte(field)}, []byte{':'})
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitNodeItemKey returns the components of a key created by nodeItemKey.
|
||||||
|
func splitNodeItemKey(key []byte) (id ID, ip net.IP, field string) {
|
||||||
|
id, key = splitNodeKey(key)
|
||||||
|
// Skip discover root.
|
||||||
|
if string(key) == dbDiscoverRoot {
|
||||||
|
return id, nil, ""
|
||||||
|
}
|
||||||
|
key = key[len(dbDiscoverRoot)+1:]
|
||||||
|
// Split out the IP.
|
||||||
|
ip = net.IP(key[:16])
|
||||||
|
if ip4 := ip.To4(); ip4 != nil {
|
||||||
|
ip = ip4
|
||||||
|
}
|
||||||
|
key = key[16+1:]
|
||||||
|
// Field is the remainder of key.
|
||||||
|
field = string(key)
|
||||||
|
return id, ip, field
|
||||||
|
}
|
||||||
|
|
||||||
|
// localItemKey returns the key of a local node item.
|
||||||
|
func localItemKey(id ID, field string) []byte {
|
||||||
|
key := append([]byte(dbLocalPrefix), id[:]...)
|
||||||
|
key = append(key, ':')
|
||||||
|
key = append(key, field...)
|
||||||
|
return key
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchInt64 retrieves an integer associated with a particular key.
|
// fetchInt64 retrieves an integer associated with a particular key.
|
||||||
@ -181,7 +219,7 @@ func (db *DB) storeUint64(key []byte, n uint64) error {
|
|||||||
|
|
||||||
// Node retrieves a node with a given id from the database.
|
// Node retrieves a node with a given id from the database.
|
||||||
func (db *DB) Node(id ID) *Node {
|
func (db *DB) Node(id ID) *Node {
|
||||||
blob, err := db.lvl.Get(makeKey(id, dbDiscoverRoot), nil)
|
blob, err := db.lvl.Get(nodeKey(id), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -207,15 +245,15 @@ func (db *DB) UpdateNode(node *Node) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := db.lvl.Put(makeKey(node.ID(), dbDiscoverRoot), blob, nil); err != nil {
|
if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return db.storeUint64(makeKey(node.ID(), dbDiscoverSeq), node.Seq())
|
return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeSeq returns the stored record sequence number of the given node.
|
// NodeSeq returns the stored record sequence number of the given node.
|
||||||
func (db *DB) NodeSeq(id ID) uint64 {
|
func (db *DB) NodeSeq(id ID) uint64 {
|
||||||
return db.fetchUint64(makeKey(id, dbDiscoverSeq))
|
return db.fetchUint64(nodeItemKey(id, zeroIP, dbNodeSeq))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve returns the stored record of the node if it has a larger sequence
|
// Resolve returns the stored record of the node if it has a larger sequence
|
||||||
@ -227,15 +265,17 @@ func (db *DB) Resolve(n *Node) *Node {
|
|||||||
return db.Node(n.ID())
|
return db.Node(n.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteNode deletes all information/keys associated with a node.
|
// DeleteNode deletes all information associated with a node.
|
||||||
func (db *DB) DeleteNode(id ID) error {
|
func (db *DB) DeleteNode(id ID) {
|
||||||
deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
|
deleteRange(db.lvl, nodeKey(id))
|
||||||
for deleter.Next() {
|
|
||||||
if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteRange(db *leveldb.DB, prefix []byte) {
|
||||||
|
it := db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||||
|
defer it.Release()
|
||||||
|
for it.Next() {
|
||||||
|
db.Delete(it.Key(), nil)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureExpirer is a small helper method ensuring that the data expiration
|
// ensureExpirer is a small helper method ensuring that the data expiration
|
||||||
@ -259,9 +299,7 @@ func (db *DB) expirer() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
if err := db.expireNodes(); err != nil {
|
db.expireNodes()
|
||||||
log.Error("Failed to expire nodedb items", "err", err)
|
|
||||||
}
|
|
||||||
case <-db.quit:
|
case <-db.quit:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -269,71 +307,85 @@ func (db *DB) expirer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// expireNodes iterates over the database and deletes all nodes that have not
|
// expireNodes iterates over the database and deletes all nodes that have not
|
||||||
// been seen (i.e. received a pong from) for some allotted time.
|
// been seen (i.e. received a pong from) for some time.
|
||||||
func (db *DB) expireNodes() error {
|
func (db *DB) expireNodes() {
|
||||||
threshold := time.Now().Add(-dbNodeExpiration)
|
it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
|
||||||
|
|
||||||
// Find discovered nodes that are older than the allowance
|
|
||||||
it := db.lvl.NewIterator(nil, nil)
|
|
||||||
defer it.Release()
|
defer it.Release()
|
||||||
|
if !it.Next() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for it.Next() {
|
var (
|
||||||
// Skip the item if not a discovery node
|
threshold = time.Now().Add(-dbNodeExpiration).Unix()
|
||||||
id, field := splitKey(it.Key())
|
youngestPong int64
|
||||||
if field != dbDiscoverRoot {
|
atEnd = false
|
||||||
continue
|
)
|
||||||
|
for !atEnd {
|
||||||
|
id, ip, field := splitNodeItemKey(it.Key())
|
||||||
|
if field == dbNodePong {
|
||||||
|
time, _ := binary.Varint(it.Value())
|
||||||
|
if time > youngestPong {
|
||||||
|
youngestPong = time
|
||||||
}
|
}
|
||||||
// Skip the node if not expired yet (and not self)
|
if time < threshold {
|
||||||
if seen := db.LastPongReceived(id); seen.After(threshold) {
|
// Last pong from this IP older than threshold, remove fields belonging to it.
|
||||||
continue
|
deleteRange(db.lvl, nodeItemKey(id, ip, ""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atEnd = !it.Next()
|
||||||
|
nextID, _ := splitNodeKey(it.Key())
|
||||||
|
if atEnd || nextID != id {
|
||||||
|
// We've moved beyond the last entry of the current ID.
|
||||||
|
// Remove everything if there was no recent enough pong.
|
||||||
|
if youngestPong > 0 && youngestPong < threshold {
|
||||||
|
deleteRange(db.lvl, nodeKey(id))
|
||||||
|
}
|
||||||
|
youngestPong = 0
|
||||||
}
|
}
|
||||||
// Otherwise delete all associated information
|
|
||||||
db.DeleteNode(id)
|
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastPingReceived retrieves the time of the last ping packet received from
|
// LastPingReceived retrieves the time of the last ping packet received from
|
||||||
// a remote node.
|
// a remote node.
|
||||||
func (db *DB) LastPingReceived(id ID) time.Time {
|
func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
|
||||||
return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPing)), 0)
|
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateLastPingReceived updates the last time we tried contacting a remote node.
|
// UpdateLastPingReceived updates the last time we tried contacting a remote node.
|
||||||
func (db *DB) UpdateLastPingReceived(id ID, instance time.Time) error {
|
func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
|
||||||
return db.storeInt64(makeKey(id, dbDiscoverPing), instance.Unix())
|
return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastPongReceived retrieves the time of the last successful pong from remote node.
|
// LastPongReceived retrieves the time of the last successful pong from remote node.
|
||||||
func (db *DB) LastPongReceived(id ID) time.Time {
|
func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
|
||||||
// Launch expirer
|
// Launch expirer
|
||||||
db.ensureExpirer()
|
db.ensureExpirer()
|
||||||
return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPong)), 0)
|
return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateLastPongReceived updates the last pong time of a node.
|
// UpdateLastPongReceived updates the last pong time of a node.
|
||||||
func (db *DB) UpdateLastPongReceived(id ID, instance time.Time) error {
|
func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
|
||||||
return db.storeInt64(makeKey(id, dbDiscoverPong), instance.Unix())
|
return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindFails retrieves the number of findnode failures since bonding.
|
// FindFails retrieves the number of findnode failures since bonding.
|
||||||
func (db *DB) FindFails(id ID) int {
|
func (db *DB) FindFails(id ID, ip net.IP) int {
|
||||||
return int(db.fetchInt64(makeKey(id, dbDiscoverFindFails)))
|
return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateFindFails updates the number of findnode failures since bonding.
|
// UpdateFindFails updates the number of findnode failures since bonding.
|
||||||
func (db *DB) UpdateFindFails(id ID, fails int) error {
|
func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
|
||||||
return db.storeInt64(makeKey(id, dbDiscoverFindFails), int64(fails))
|
return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalSeq retrieves the local record sequence counter.
|
// LocalSeq retrieves the local record sequence counter.
|
||||||
func (db *DB) localSeq(id ID) uint64 {
|
func (db *DB) localSeq(id ID) uint64 {
|
||||||
return db.fetchUint64(makeKey(id, dbLocalSeq))
|
return db.fetchUint64(nodeItemKey(id, zeroIP, dbLocalSeq))
|
||||||
}
|
}
|
||||||
|
|
||||||
// storeLocalSeq stores the local record sequence counter.
|
// storeLocalSeq stores the local record sequence counter.
|
||||||
func (db *DB) storeLocalSeq(id ID, n uint64) {
|
func (db *DB) storeLocalSeq(id ID, n uint64) {
|
||||||
db.storeUint64(makeKey(id, dbLocalSeq), n)
|
db.storeUint64(nodeItemKey(id, zeroIP, dbLocalSeq), n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QuerySeeds retrieves random nodes to be used as potential seed nodes
|
// QuerySeeds retrieves random nodes to be used as potential seed nodes
|
||||||
@ -355,14 +407,14 @@ seek:
|
|||||||
ctr := id[0]
|
ctr := id[0]
|
||||||
rand.Read(id[:])
|
rand.Read(id[:])
|
||||||
id[0] = ctr + id[0]%16
|
id[0] = ctr + id[0]%16
|
||||||
it.Seek(makeKey(id, dbDiscoverRoot))
|
it.Seek(nodeKey(id))
|
||||||
|
|
||||||
n := nextNode(it)
|
n := nextNode(it)
|
||||||
if n == nil {
|
if n == nil {
|
||||||
id[0] = 0
|
id[0] = 0
|
||||||
continue seek // iterator exhausted
|
continue seek // iterator exhausted
|
||||||
}
|
}
|
||||||
if now.Sub(db.LastPongReceived(n.ID())) > maxAge {
|
if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
|
||||||
continue seek
|
continue seek
|
||||||
}
|
}
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
@ -379,8 +431,8 @@ seek:
|
|||||||
// database entries.
|
// database entries.
|
||||||
func nextNode(it iterator.Iterator) *Node {
|
func nextNode(it iterator.Iterator) *Node {
|
||||||
for end := false; !end; end = !it.Next() {
|
for end := false; !end; end = !it.Next() {
|
||||||
id, field := splitKey(it.Key())
|
id, rest := splitNodeKey(it.Key())
|
||||||
if field != dbDiscoverRoot {
|
if string(rest) != dbDiscoverRoot {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return mustDecodeNode(id[:], it.Value())
|
return mustDecodeNode(id[:], it.Value())
|
||||||
|
@ -28,42 +28,54 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var nodeDBKeyTests = []struct {
|
var keytestID = HexID("51232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
|
||||||
id ID
|
|
||||||
field string
|
func TestDBNodeKey(t *testing.T) {
|
||||||
key []byte
|
enc := nodeKey(keytestID)
|
||||||
}{
|
want := []byte{
|
||||||
{
|
'n', ':',
|
||||||
id: ID{},
|
|
||||||
field: "version",
|
|
||||||
key: []byte{0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e}, // field
|
|
||||||
},
|
|
||||||
{
|
|
||||||
id: HexID("51232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
|
||||||
field: ":discover",
|
|
||||||
key: []byte{
|
|
||||||
0x6e, 0x3a, // prefix
|
|
||||||
0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
|
0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
|
||||||
0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
|
0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
|
||||||
0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
|
0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
|
||||||
0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
|
0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
|
||||||
0x3a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, // field
|
':', 'v', '4',
|
||||||
},
|
}
|
||||||
},
|
if !bytes.Equal(enc, want) {
|
||||||
|
t.Errorf("wrong encoded key:\ngot %q\nwant %q", enc, want)
|
||||||
|
}
|
||||||
|
id, _ := splitNodeKey(enc)
|
||||||
|
if id != keytestID {
|
||||||
|
t.Errorf("wrong ID from splitNodeKey")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDBKeys(t *testing.T) {
|
func TestDBNodeItemKey(t *testing.T) {
|
||||||
for i, tt := range nodeDBKeyTests {
|
wantIP := net.IP{127, 0, 0, 3}
|
||||||
if key := makeKey(tt.id, tt.field); !bytes.Equal(key, tt.key) {
|
wantField := "foobar"
|
||||||
t.Errorf("make test %d: key mismatch: have 0x%x, want 0x%x", i, key, tt.key)
|
enc := nodeItemKey(keytestID, wantIP, wantField)
|
||||||
|
want := []byte{
|
||||||
|
'n', ':',
|
||||||
|
0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
|
||||||
|
0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
|
||||||
|
0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
|
||||||
|
0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
|
||||||
|
':', 'v', '4', ':',
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // IP
|
||||||
|
0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x03, //
|
||||||
|
':', 'f', 'o', 'o', 'b', 'a', 'r',
|
||||||
}
|
}
|
||||||
id, field := splitKey(tt.key)
|
if !bytes.Equal(enc, want) {
|
||||||
if !bytes.Equal(id[:], tt.id[:]) {
|
t.Errorf("wrong encoded key:\ngot %q\nwant %q", enc, want)
|
||||||
t.Errorf("split test %d: id mismatch: have 0x%x, want 0x%x", i, id, tt.id)
|
|
||||||
}
|
}
|
||||||
if field != tt.field {
|
id, ip, field := splitNodeItemKey(enc)
|
||||||
t.Errorf("split test %d: field mismatch: have 0x%x, want 0x%x", i, field, tt.field)
|
if id != keytestID {
|
||||||
|
t.Errorf("splitNodeItemKey returned wrong ID: %v", id)
|
||||||
}
|
}
|
||||||
|
if !bytes.Equal(ip, wantIP) {
|
||||||
|
t.Errorf("splitNodeItemKey returned wrong IP: %v", ip)
|
||||||
|
}
|
||||||
|
if field != wantField {
|
||||||
|
t.Errorf("splitNodeItemKey returned wrong field: %q", field)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,33 +125,33 @@ func TestDBFetchStore(t *testing.T) {
|
|||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
// Check fetch/store operations on a node ping object
|
// Check fetch/store operations on a node ping object
|
||||||
if stored := db.LastPingReceived(node.ID()); stored.Unix() != 0 {
|
if stored := db.LastPingReceived(node.ID(), node.IP()); stored.Unix() != 0 {
|
||||||
t.Errorf("ping: non-existing object: %v", stored)
|
t.Errorf("ping: non-existing object: %v", stored)
|
||||||
}
|
}
|
||||||
if err := db.UpdateLastPingReceived(node.ID(), inst); err != nil {
|
if err := db.UpdateLastPingReceived(node.ID(), node.IP(), inst); err != nil {
|
||||||
t.Errorf("ping: failed to update: %v", err)
|
t.Errorf("ping: failed to update: %v", err)
|
||||||
}
|
}
|
||||||
if stored := db.LastPingReceived(node.ID()); stored.Unix() != inst.Unix() {
|
if stored := db.LastPingReceived(node.ID(), node.IP()); stored.Unix() != inst.Unix() {
|
||||||
t.Errorf("ping: value mismatch: have %v, want %v", stored, inst)
|
t.Errorf("ping: value mismatch: have %v, want %v", stored, inst)
|
||||||
}
|
}
|
||||||
// Check fetch/store operations on a node pong object
|
// Check fetch/store operations on a node pong object
|
||||||
if stored := db.LastPongReceived(node.ID()); stored.Unix() != 0 {
|
if stored := db.LastPongReceived(node.ID(), node.IP()); stored.Unix() != 0 {
|
||||||
t.Errorf("pong: non-existing object: %v", stored)
|
t.Errorf("pong: non-existing object: %v", stored)
|
||||||
}
|
}
|
||||||
if err := db.UpdateLastPongReceived(node.ID(), inst); err != nil {
|
if err := db.UpdateLastPongReceived(node.ID(), node.IP(), inst); err != nil {
|
||||||
t.Errorf("pong: failed to update: %v", err)
|
t.Errorf("pong: failed to update: %v", err)
|
||||||
}
|
}
|
||||||
if stored := db.LastPongReceived(node.ID()); stored.Unix() != inst.Unix() {
|
if stored := db.LastPongReceived(node.ID(), node.IP()); stored.Unix() != inst.Unix() {
|
||||||
t.Errorf("pong: value mismatch: have %v, want %v", stored, inst)
|
t.Errorf("pong: value mismatch: have %v, want %v", stored, inst)
|
||||||
}
|
}
|
||||||
// Check fetch/store operations on a node findnode-failure object
|
// Check fetch/store operations on a node findnode-failure object
|
||||||
if stored := db.FindFails(node.ID()); stored != 0 {
|
if stored := db.FindFails(node.ID(), node.IP()); stored != 0 {
|
||||||
t.Errorf("find-node fails: non-existing object: %v", stored)
|
t.Errorf("find-node fails: non-existing object: %v", stored)
|
||||||
}
|
}
|
||||||
if err := db.UpdateFindFails(node.ID(), num); err != nil {
|
if err := db.UpdateFindFails(node.ID(), node.IP(), num); err != nil {
|
||||||
t.Errorf("find-node fails: failed to update: %v", err)
|
t.Errorf("find-node fails: failed to update: %v", err)
|
||||||
}
|
}
|
||||||
if stored := db.FindFails(node.ID()); stored != num {
|
if stored := db.FindFails(node.ID(), node.IP()); stored != num {
|
||||||
t.Errorf("find-node fails: value mismatch: have %v, want %v", stored, num)
|
t.Errorf("find-node fails: value mismatch: have %v, want %v", stored, num)
|
||||||
}
|
}
|
||||||
// Check fetch/store operations on an actual node object
|
// Check fetch/store operations on an actual node object
|
||||||
@ -256,7 +268,7 @@ func testSeedQuery() error {
|
|||||||
if err := db.UpdateNode(seed.node); err != nil {
|
if err := db.UpdateNode(seed.node); err != nil {
|
||||||
return fmt.Errorf("node %d: failed to insert: %v", i, err)
|
return fmt.Errorf("node %d: failed to insert: %v", i, err)
|
||||||
}
|
}
|
||||||
if err := db.UpdateLastPongReceived(seed.node.ID(), seed.pong); err != nil {
|
if err := db.UpdateLastPongReceived(seed.node.ID(), seed.node.IP(), seed.pong); err != nil {
|
||||||
return fmt.Errorf("node %d: failed to insert bondTime: %v", i, err)
|
return fmt.Errorf("node %d: failed to insert bondTime: %v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -323,8 +335,10 @@ func TestDBPersistency(t *testing.T) {
|
|||||||
var nodeDBExpirationNodes = []struct {
|
var nodeDBExpirationNodes = []struct {
|
||||||
node *Node
|
node *Node
|
||||||
pong time.Time
|
pong time.Time
|
||||||
|
storeNode bool
|
||||||
exp bool
|
exp bool
|
||||||
}{
|
}{
|
||||||
|
// Node has new enough pong time and isn't expired:
|
||||||
{
|
{
|
||||||
node: NewV4(
|
node: NewV4(
|
||||||
hexPubkey("8d110e2ed4b446d9b5fb50f117e5f37fb7597af455e1dab0e6f045a6eeaa786a6781141659020d38bdc5e698ed3d4d2bafa8b5061810dfa63e8ac038db2e9b67"),
|
hexPubkey("8d110e2ed4b446d9b5fb50f117e5f37fb7597af455e1dab0e6f045a6eeaa786a6781141659020d38bdc5e698ed3d4d2bafa8b5061810dfa63e8ac038db2e9b67"),
|
||||||
@ -332,15 +346,77 @@ var nodeDBExpirationNodes = []struct {
|
|||||||
30303,
|
30303,
|
||||||
30303,
|
30303,
|
||||||
),
|
),
|
||||||
|
storeNode: true,
|
||||||
pong: time.Now().Add(-dbNodeExpiration + time.Minute),
|
pong: time.Now().Add(-dbNodeExpiration + time.Minute),
|
||||||
exp: false,
|
exp: false,
|
||||||
}, {
|
},
|
||||||
|
// Node with pong time before expiration is removed:
|
||||||
|
{
|
||||||
node: NewV4(
|
node: NewV4(
|
||||||
hexPubkey("913a205579c32425b220dfba999d215066e5bdbf900226b11da1907eae5e93eb40616d47412cf819664e9eacbdfcca6b0c6e07e09847a38472d4be46ab0c3672"),
|
hexPubkey("913a205579c32425b220dfba999d215066e5bdbf900226b11da1907eae5e93eb40616d47412cf819664e9eacbdfcca6b0c6e07e09847a38472d4be46ab0c3672"),
|
||||||
net.IP{127, 0, 0, 2},
|
net.IP{127, 0, 0, 2},
|
||||||
30303,
|
30303,
|
||||||
30303,
|
30303,
|
||||||
),
|
),
|
||||||
|
storeNode: true,
|
||||||
|
pong: time.Now().Add(-dbNodeExpiration - time.Minute),
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
// Just pong time, no node stored:
|
||||||
|
{
|
||||||
|
node: NewV4(
|
||||||
|
hexPubkey("b56670e0b6bad2c5dab9f9fe6f061a16cf78d68b6ae2cfda3144262d08d97ce5f46fd8799b6d1f709b1abe718f2863e224488bd7518e5e3b43809ac9bd1138ca"),
|
||||||
|
net.IP{127, 0, 0, 3},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
storeNode: false,
|
||||||
|
pong: time.Now().Add(-dbNodeExpiration - time.Minute),
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
// Node with multiple pong times, all older than expiration.
|
||||||
|
{
|
||||||
|
node: NewV4(
|
||||||
|
hexPubkey("29f619cebfd32c9eab34aec797ed5e3fe15b9b45be95b4df3f5fe6a9ae892f433eb08d7698b2ef3621568b0fb70d57b515ab30d4e72583b798298e0f0a66b9d1"),
|
||||||
|
net.IP{127, 0, 0, 4},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
storeNode: true,
|
||||||
|
pong: time.Now().Add(-dbNodeExpiration - time.Minute),
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
node: NewV4(
|
||||||
|
hexPubkey("29f619cebfd32c9eab34aec797ed5e3fe15b9b45be95b4df3f5fe6a9ae892f433eb08d7698b2ef3621568b0fb70d57b515ab30d4e72583b798298e0f0a66b9d1"),
|
||||||
|
net.IP{127, 0, 0, 5},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
storeNode: false,
|
||||||
|
pong: time.Now().Add(-dbNodeExpiration - 2*time.Minute),
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
// Node with multiple pong times, one newer, one older than expiration.
|
||||||
|
{
|
||||||
|
node: NewV4(
|
||||||
|
hexPubkey("3b73a9e5f4af6c4701c57c73cc8cfa0f4802840b24c11eba92aac3aef65644a3728b4b2aec8199f6d72bd66be2c65861c773129039bd47daa091ca90a6d4c857"),
|
||||||
|
net.IP{127, 0, 0, 6},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
storeNode: true,
|
||||||
|
pong: time.Now().Add(-dbNodeExpiration + time.Minute),
|
||||||
|
exp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
node: NewV4(
|
||||||
|
hexPubkey("3b73a9e5f4af6c4701c57c73cc8cfa0f4802840b24c11eba92aac3aef65644a3728b4b2aec8199f6d72bd66be2c65861c773129039bd47daa091ca90a6d4c857"),
|
||||||
|
net.IP{127, 0, 0, 7},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
storeNode: false,
|
||||||
pong: time.Now().Add(-dbNodeExpiration - time.Minute),
|
pong: time.Now().Add(-dbNodeExpiration - time.Minute),
|
||||||
exp: true,
|
exp: true,
|
||||||
},
|
},
|
||||||
@ -350,23 +426,39 @@ func TestDBExpiration(t *testing.T) {
|
|||||||
db, _ := OpenDB("")
|
db, _ := OpenDB("")
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
// Add all the test nodes and set their last pong time
|
// Add all the test nodes and set their last pong time.
|
||||||
for i, seed := range nodeDBExpirationNodes {
|
for i, seed := range nodeDBExpirationNodes {
|
||||||
|
if seed.storeNode {
|
||||||
if err := db.UpdateNode(seed.node); err != nil {
|
if err := db.UpdateNode(seed.node); err != nil {
|
||||||
t.Fatalf("node %d: failed to insert: %v", i, err)
|
t.Fatalf("node %d: failed to insert: %v", i, err)
|
||||||
}
|
}
|
||||||
if err := db.UpdateLastPongReceived(seed.node.ID(), seed.pong); err != nil {
|
}
|
||||||
|
if err := db.UpdateLastPongReceived(seed.node.ID(), seed.node.IP(), seed.pong); err != nil {
|
||||||
t.Fatalf("node %d: failed to update bondTime: %v", i, err)
|
t.Fatalf("node %d: failed to update bondTime: %v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Expire some of them, and check the rest
|
|
||||||
if err := db.expireNodes(); err != nil {
|
db.expireNodes()
|
||||||
t.Fatalf("failed to expire nodes: %v", err)
|
|
||||||
}
|
// Check that expired entries have been removed.
|
||||||
|
unixZeroTime := time.Unix(0, 0)
|
||||||
for i, seed := range nodeDBExpirationNodes {
|
for i, seed := range nodeDBExpirationNodes {
|
||||||
node := db.Node(seed.node.ID())
|
node := db.Node(seed.node.ID())
|
||||||
if (node == nil && !seed.exp) || (node != nil && seed.exp) {
|
pong := db.LastPongReceived(seed.node.ID(), seed.node.IP())
|
||||||
t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp)
|
if seed.exp {
|
||||||
|
if seed.storeNode && node != nil {
|
||||||
|
t.Errorf("node %d (%s) shouldn't be present after expiration", i, seed.node.ID().TerminalString())
|
||||||
|
}
|
||||||
|
if !pong.Equal(unixZeroTime) {
|
||||||
|
t.Errorf("pong time %d (%s %v) shouldn't be present after expiration", i, seed.node.ID().TerminalString(), seed.node.IP())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if seed.storeNode && node == nil {
|
||||||
|
t.Errorf("node %d (%s) should be present after expiration", i, seed.node.ID().TerminalString())
|
||||||
|
}
|
||||||
|
if !pong.Equal(seed.pong.Truncate(1 * time.Second)) {
|
||||||
|
t.Errorf("pong time %d (%s) should be %v after expiration, but is %v", i, seed.node.ID().TerminalString(), seed.pong, pong)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user