p2p/discover: add node expirer and related tests
This commit is contained in:
parent
a136e2bb22
commit
437cf4b3ac
@ -7,6 +7,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/logger"
|
"github.com/ethereum/go-ethereum/logger"
|
||||||
@ -17,13 +18,19 @@ import (
|
|||||||
"github.com/syndtr/goleveldb/leveldb/storage"
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Special node ID to use as a nil element.
|
var (
|
||||||
var nodeDBNilNodeID = NodeID{}
|
nodeDBNilNodeID = NodeID{} // Special node ID to use as a nil element.
|
||||||
|
nodeDBNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
|
||||||
|
nodeDBCleanupCycle = time.Hour // Time period for running the expiration task.
|
||||||
|
)
|
||||||
|
|
||||||
// nodeDB stores all nodes we know about.
|
// nodeDB stores all nodes we know about.
|
||||||
type nodeDB struct {
|
type nodeDB struct {
|
||||||
lvl *leveldb.DB // Interface to the database itself
|
lvl *leveldb.DB // Interface to the database itself
|
||||||
seeder iterator.Iterator // Iterator for fetching possible seed nodes
|
seeder iterator.Iterator // Iterator for fetching possible seed nodes
|
||||||
|
|
||||||
|
runner sync.Once // Ensures we can start at most one expirer
|
||||||
|
quit chan struct{} // Channel to signal the expiring thread to stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schema layout for the node database
|
// Schema layout for the node database
|
||||||
@ -53,7 +60,10 @@ func newMemoryNodeDB() (*nodeDB, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &nodeDB{lvl: db}, nil
|
return &nodeDB{
|
||||||
|
lvl: db,
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
|
// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
|
||||||
@ -91,7 +101,10 @@ func newPersistentNodeDB(path string, version int) (*nodeDB, error) {
|
|||||||
return newPersistentNodeDB(path, version)
|
return newPersistentNodeDB(path, version)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &nodeDB{lvl: db}, nil
|
return &nodeDB{
|
||||||
|
lvl: db,
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeKey generates the leveldb key-blob from a node id and its particular
|
// makeKey generates the leveldb key-blob from a node id and its particular
|
||||||
@ -164,6 +177,55 @@ func (db *nodeDB) updateNode(node *Node) error {
|
|||||||
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
|
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// expirer should be started in a go routine, and is responsible for looping ad
|
||||||
|
// infinitum and dropping stale data from the database.
|
||||||
|
func (db *nodeDB) expirer() {
|
||||||
|
db.runner.Do(func() {
|
||||||
|
tick := time.Tick(nodeDBCleanupCycle)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-tick:
|
||||||
|
if err := db.expireNodes(); err != nil {
|
||||||
|
glog.V(logger.Error).Infof("Failed to expire nodedb items: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-db.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// expireNodes iterates over the database and deletes all nodes that have not
|
||||||
|
// been seen (i.e. received a pong from) for some alloted time.
|
||||||
|
func (db *nodeDB) expireNodes() error {
|
||||||
|
threshold := time.Now().Add(-nodeDBNodeExpiration)
|
||||||
|
|
||||||
|
// Find discovered nodes that are older than the allowance
|
||||||
|
it := db.lvl.NewIterator(nil, nil)
|
||||||
|
defer it.Release()
|
||||||
|
|
||||||
|
for it.Next() {
|
||||||
|
// Skip the item if not a discovery node
|
||||||
|
id, field := splitKey(it.Key())
|
||||||
|
if field != nodeDBDiscoverRoot {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Skip the node if not expired yet
|
||||||
|
if seen := db.lastPong(id); seen.After(threshold) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Otherwise delete all associated information
|
||||||
|
prefix := makeKey(id, "")
|
||||||
|
for ok := it.Seek(prefix); ok && bytes.HasPrefix(it.Key(), prefix); ok = it.Next() {
|
||||||
|
if err := db.lvl.Delete(it.Key(), nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// lastPing retrieves the time of the last ping packet send to a remote node,
|
// lastPing retrieves the time of the last ping packet send to a remote node,
|
||||||
// requesting binding.
|
// requesting binding.
|
||||||
func (db *nodeDB) lastPing(id NodeID) time.Time {
|
func (db *nodeDB) lastPing(id NodeID) time.Time {
|
||||||
@ -226,5 +288,6 @@ func (db *nodeDB) close() {
|
|||||||
if db.seeder != nil {
|
if db.seeder != nil {
|
||||||
db.seeder.Release()
|
db.seeder.Release()
|
||||||
}
|
}
|
||||||
|
close(db.quit)
|
||||||
db.lvl.Close()
|
db.lvl.Close()
|
||||||
}
|
}
|
||||||
|
@ -264,3 +264,50 @@ func TestNodeDBPersistency(t *testing.T) {
|
|||||||
}
|
}
|
||||||
db.close()
|
db.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nodeDBExpirationNodes = []struct {
|
||||||
|
node Node
|
||||||
|
pong time.Time
|
||||||
|
exp bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
node: Node{
|
||||||
|
ID: MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
|
IP: []byte{127, 0, 0, 1},
|
||||||
|
},
|
||||||
|
pong: time.Now().Add(-nodeDBNodeExpiration + time.Minute),
|
||||||
|
exp: false,
|
||||||
|
}, {
|
||||||
|
node: Node{
|
||||||
|
ID: MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
|
IP: []byte{127, 0, 0, 2},
|
||||||
|
},
|
||||||
|
pong: time.Now().Add(-nodeDBNodeExpiration - time.Minute),
|
||||||
|
exp: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNodeDBExpiration(t *testing.T) {
|
||||||
|
db, _ := newNodeDB("", Version)
|
||||||
|
defer db.close()
|
||||||
|
|
||||||
|
// Add all the test nodes and set their last pong time
|
||||||
|
for i, seed := range nodeDBExpirationNodes {
|
||||||
|
if err := db.updateNode(&seed.node); err != nil {
|
||||||
|
t.Fatalf("node %d: failed to insert: %v", i, err)
|
||||||
|
}
|
||||||
|
if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil {
|
||||||
|
t.Fatalf("node %d: failed to update pong: %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Expire some of them, and check the rest
|
||||||
|
if err := db.expireNodes(); err != nil {
|
||||||
|
t.Fatalf("failed to expire nodes: %v", err)
|
||||||
|
}
|
||||||
|
for i, seed := range nodeDBExpirationNodes {
|
||||||
|
node := db.node(seed.node.ID)
|
||||||
|
if (node == nil && !seed.exp) || (node != nil && seed.exp) {
|
||||||
|
t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -335,6 +335,8 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
|
|||||||
}
|
}
|
||||||
// Pong received, update the database and return
|
// Pong received, update the database and return
|
||||||
tab.db.updateLastPong(id, time.Now())
|
tab.db.updateLastPong(id, time.Now())
|
||||||
|
go tab.db.expirer()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user