Merge pull request #1859 from fjl/fix-discover-refresh-race
p2p/discover: fix race involving the seed node iterator
This commit is contained in:
commit
e3ac56d502
@ -21,6 +21,7 @@ package discover
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@ -46,11 +47,8 @@ var (
|
|||||||
|
|
||||||
// nodeDB stores all nodes we know about.
|
// nodeDB stores all nodes we know about.
|
||||||
type nodeDB struct {
|
type nodeDB struct {
|
||||||
lvl *leveldb.DB // Interface to the database itself
|
lvl *leveldb.DB // Interface to the database itself
|
||||||
seeder iterator.Iterator // Iterator for fetching possible seed nodes
|
self NodeID // Own node id to prevent adding it into the database
|
||||||
|
|
||||||
self NodeID // Own node id to prevent adding it into the database
|
|
||||||
|
|
||||||
runner sync.Once // Ensures we can start at most one expirer
|
runner sync.Once // Ensures we can start at most one expirer
|
||||||
quit chan struct{} // Channel to signal the expiring thread to stop
|
quit chan struct{} // Channel to signal the expiring thread to stop
|
||||||
}
|
}
|
||||||
@ -302,52 +300,70 @@ func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
|
|||||||
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
|
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
|
||||||
}
|
}
|
||||||
|
|
||||||
// querySeeds retrieves a batch of nodes to be used as potential seed servers
|
// querySeeds retrieves random nodes to be used as potential seed nodes
|
||||||
// during bootstrapping the node into the network.
|
// for bootstrapping.
|
||||||
//
|
func (db *nodeDB) querySeeds(n int, maxAge time.Duration) []*Node {
|
||||||
// Ideal seeds are the most recently seen nodes (highest probability to be still
|
var (
|
||||||
// alive), but yet untried. However, since leveldb only supports dumb iteration
|
now = time.Now()
|
||||||
// we will instead start pulling in potential seeds that haven't been yet pinged
|
nodes = make([]*Node, 0, n)
|
||||||
// since the start of the boot procedure.
|
it = db.lvl.NewIterator(nil, nil)
|
||||||
//
|
id NodeID
|
||||||
// If the database runs out of potential seeds, we restart the startup counter
|
)
|
||||||
// and start iterating over the peers again.
|
defer it.Release()
|
||||||
func (db *nodeDB) querySeeds(n int) []*Node {
|
|
||||||
// Create a new seed iterator if none exists
|
seek:
|
||||||
if db.seeder == nil {
|
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
|
||||||
db.seeder = db.lvl.NewIterator(nil, nil)
|
// Seek to a random entry. The first byte is incremented by a
|
||||||
}
|
// random amount each time in order to increase the likelihood
|
||||||
// Iterate over the nodes and find suitable seeds
|
// of hitting all existing nodes in very small databases.
|
||||||
nodes := make([]*Node, 0, n)
|
ctr := id[0]
|
||||||
for len(nodes) < n && db.seeder.Next() {
|
rand.Read(id[:])
|
||||||
// Iterate until a discovery node is found
|
id[0] = ctr + id[0]%16
|
||||||
id, field := splitKey(db.seeder.Key())
|
it.Seek(makeKey(id, nodeDBDiscoverRoot))
|
||||||
if field != nodeDBDiscoverRoot {
|
|
||||||
continue
|
n := nextNode(it)
|
||||||
|
if n == nil {
|
||||||
|
id[0] = 0
|
||||||
|
continue seek // iterator exhausted
|
||||||
}
|
}
|
||||||
// Dump it if its a self reference
|
if n.ID == db.self {
|
||||||
if bytes.Compare(id[:], db.self[:]) == 0 {
|
continue seek
|
||||||
db.deleteNode(id)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
// Load it as a potential seed
|
if now.Sub(db.lastPong(n.ID)) > maxAge {
|
||||||
if node := db.node(id); node != nil {
|
continue seek
|
||||||
nodes = append(nodes, node)
|
|
||||||
}
|
}
|
||||||
}
|
for i := range nodes {
|
||||||
// Release the iterator if we reached the end
|
if nodes[i].ID == n.ID {
|
||||||
if len(nodes) == 0 {
|
continue seek // duplicate
|
||||||
db.seeder.Release()
|
}
|
||||||
db.seeder = nil
|
}
|
||||||
|
nodes = append(nodes, n)
|
||||||
}
|
}
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reads the next node record from the iterator, skipping over other
|
||||||
|
// database entries.
|
||||||
|
func nextNode(it iterator.Iterator) *Node {
|
||||||
|
for end := false; !end; end = !it.Next() {
|
||||||
|
id, field := splitKey(it.Key())
|
||||||
|
if field != nodeDBDiscoverRoot {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var n Node
|
||||||
|
if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
|
||||||
|
if glog.V(logger.Warn) {
|
||||||
|
glog.Errorf("invalid node %x: %v", id, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return &n
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// close flushes and closes the database files.
|
// close flushes and closes the database files.
|
||||||
func (db *nodeDB) close() {
|
func (db *nodeDB) close() {
|
||||||
if db.seeder != nil {
|
|
||||||
db.seeder.Release()
|
|
||||||
}
|
|
||||||
close(db.quit)
|
close(db.quit)
|
||||||
db.lvl.Close()
|
db.lvl.Close()
|
||||||
}
|
}
|
||||||
|
@ -162,9 +162,33 @@ var nodeDBSeedQueryNodes = []struct {
|
|||||||
node *Node
|
node *Node
|
||||||
pong time.Time
|
pong time.Time
|
||||||
}{
|
}{
|
||||||
|
// This one should not be in the result set because its last
|
||||||
|
// pong time is too far in the past.
|
||||||
{
|
{
|
||||||
node: newNode(
|
node: newNode(
|
||||||
MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
|
net.IP{127, 0, 0, 3},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
pong: time.Now().Add(-3 * time.Hour),
|
||||||
|
},
|
||||||
|
// This one shouldn't be in in the result set because its
|
||||||
|
// nodeID is the local node's ID.
|
||||||
|
{
|
||||||
|
node: newNode(
|
||||||
|
MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
|
net.IP{127, 0, 0, 3},
|
||||||
|
30303,
|
||||||
|
30303,
|
||||||
|
),
|
||||||
|
pong: time.Now().Add(-4 * time.Second),
|
||||||
|
},
|
||||||
|
|
||||||
|
// These should be in the result set.
|
||||||
|
{
|
||||||
|
node: newNode(
|
||||||
|
MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
net.IP{127, 0, 0, 1},
|
net.IP{127, 0, 0, 1},
|
||||||
30303,
|
30303,
|
||||||
30303,
|
30303,
|
||||||
@ -173,7 +197,7 @@ var nodeDBSeedQueryNodes = []struct {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
node: newNode(
|
node: newNode(
|
||||||
MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
net.IP{127, 0, 0, 2},
|
net.IP{127, 0, 0, 2},
|
||||||
30303,
|
30303,
|
||||||
30303,
|
30303,
|
||||||
@ -182,7 +206,7 @@ var nodeDBSeedQueryNodes = []struct {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
node: newNode(
|
node: newNode(
|
||||||
MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
|
||||||
net.IP{127, 0, 0, 3},
|
net.IP{127, 0, 0, 3},
|
||||||
30303,
|
30303,
|
||||||
30303,
|
30303,
|
||||||
@ -192,7 +216,7 @@ var nodeDBSeedQueryNodes = []struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNodeDBSeedQuery(t *testing.T) {
|
func TestNodeDBSeedQuery(t *testing.T) {
|
||||||
db, _ := newNodeDB("", Version, NodeID{})
|
db, _ := newNodeDB("", Version, nodeDBSeedQueryNodes[1].node.ID)
|
||||||
defer db.close()
|
defer db.close()
|
||||||
|
|
||||||
// Insert a batch of nodes for querying
|
// Insert a batch of nodes for querying
|
||||||
@ -200,20 +224,24 @@ func TestNodeDBSeedQuery(t *testing.T) {
|
|||||||
if err := db.updateNode(seed.node); err != nil {
|
if err := db.updateNode(seed.node); err != nil {
|
||||||
t.Fatalf("node %d: failed to insert: %v", i, err)
|
t.Fatalf("node %d: failed to insert: %v", i, err)
|
||||||
}
|
}
|
||||||
|
if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil {
|
||||||
|
t.Fatalf("node %d: failed to insert lastPong: %v", i, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the entire batch and check for duplicates
|
// Retrieve the entire batch and check for duplicates
|
||||||
seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes))
|
seeds := db.querySeeds(len(nodeDBSeedQueryNodes)*2, time.Hour)
|
||||||
if len(seeds) != len(nodeDBSeedQueryNodes) {
|
|
||||||
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes))
|
|
||||||
}
|
|
||||||
have := make(map[NodeID]struct{})
|
have := make(map[NodeID]struct{})
|
||||||
for _, seed := range seeds {
|
for _, seed := range seeds {
|
||||||
have[seed.ID] = struct{}{}
|
have[seed.ID] = struct{}{}
|
||||||
}
|
}
|
||||||
want := make(map[NodeID]struct{})
|
want := make(map[NodeID]struct{})
|
||||||
for _, seed := range nodeDBSeedQueryNodes {
|
for _, seed := range nodeDBSeedQueryNodes[2:] {
|
||||||
want[seed.node.ID] = struct{}{}
|
want[seed.node.ID] = struct{}{}
|
||||||
}
|
}
|
||||||
|
if len(seeds) != len(want) {
|
||||||
|
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(want))
|
||||||
|
}
|
||||||
for id, _ := range have {
|
for id, _ := range have {
|
||||||
if _, ok := want[id]; !ok {
|
if _, ok := want[id]; !ok {
|
||||||
t.Errorf("extra seed: %v", id)
|
t.Errorf("extra seed: %v", id)
|
||||||
@ -224,63 +252,6 @@ func TestNodeDBSeedQuery(t *testing.T) {
|
|||||||
t.Errorf("missing seed: %v", id)
|
t.Errorf("missing seed: %v", id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Make sure the next batch is empty (seed EOF)
|
|
||||||
seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes))
|
|
||||||
if len(seeds) != 0 {
|
|
||||||
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNodeDBSeedQueryContinuation(t *testing.T) {
|
|
||||||
db, _ := newNodeDB("", Version, NodeID{})
|
|
||||||
defer db.close()
|
|
||||||
|
|
||||||
// Insert a batch of nodes for querying
|
|
||||||
for i, seed := range nodeDBSeedQueryNodes {
|
|
||||||
if err := db.updateNode(seed.node); err != nil {
|
|
||||||
t.Fatalf("node %d: failed to insert: %v", i, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Iteratively retrieve the batch, checking for an empty batch on reset
|
|
||||||
for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
|
|
||||||
if seeds := db.querySeeds(1); len(seeds) != 1 {
|
|
||||||
t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if seeds := db.querySeeds(1); len(seeds) != 0 {
|
|
||||||
t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0)
|
|
||||||
}
|
|
||||||
for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
|
|
||||||
if seeds := db.querySeeds(1); len(seeds) != 1 {
|
|
||||||
t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNodeDBSelfSeedQuery(t *testing.T) {
|
|
||||||
// Assign a node as self to verify evacuation
|
|
||||||
self := nodeDBSeedQueryNodes[0].node.ID
|
|
||||||
db, _ := newNodeDB("", Version, self)
|
|
||||||
defer db.close()
|
|
||||||
|
|
||||||
// Insert a batch of nodes for querying
|
|
||||||
for i, seed := range nodeDBSeedQueryNodes {
|
|
||||||
if err := db.updateNode(seed.node); err != nil {
|
|
||||||
t.Fatalf("node %d: failed to insert: %v", i, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Retrieve the entire batch and check that self was evacuated
|
|
||||||
seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes))
|
|
||||||
if len(seeds) != len(nodeDBSeedQueryNodes)-1 {
|
|
||||||
t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)-1)
|
|
||||||
}
|
|
||||||
have := make(map[NodeID]struct{})
|
|
||||||
for _, seed := range seeds {
|
|
||||||
have[seed.ID] = struct{}{}
|
|
||||||
}
|
|
||||||
if _, ok := have[self]; ok {
|
|
||||||
t.Errorf("self not evacuated")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNodeDBPersistency(t *testing.T) {
|
func TestNodeDBPersistency(t *testing.T) {
|
||||||
|
@ -44,6 +44,10 @@ const (
|
|||||||
|
|
||||||
maxBondingPingPongs = 16
|
maxBondingPingPongs = 16
|
||||||
maxFindnodeFailures = 5
|
maxFindnodeFailures = 5
|
||||||
|
|
||||||
|
autoRefreshInterval = 1 * time.Hour
|
||||||
|
seedCount = 30
|
||||||
|
seedMaxAge = 5 * 24 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
type Table struct {
|
type Table struct {
|
||||||
@ -52,6 +56,10 @@ type Table struct {
|
|||||||
nursery []*Node // bootstrap nodes
|
nursery []*Node // bootstrap nodes
|
||||||
db *nodeDB // database of known nodes
|
db *nodeDB // database of known nodes
|
||||||
|
|
||||||
|
refreshReq chan struct{}
|
||||||
|
closeReq chan struct{}
|
||||||
|
closed chan struct{}
|
||||||
|
|
||||||
bondmu sync.Mutex
|
bondmu sync.Mutex
|
||||||
bonding map[NodeID]*bondproc
|
bonding map[NodeID]*bondproc
|
||||||
bondslots chan struct{} // limits total number of active bonding processes
|
bondslots chan struct{} // limits total number of active bonding processes
|
||||||
@ -80,10 +88,7 @@ type transport interface {
|
|||||||
|
|
||||||
// bucket contains nodes, ordered by their last activity. the entry
|
// bucket contains nodes, ordered by their last activity. the entry
|
||||||
// that was most recently active is the first element in entries.
|
// that was most recently active is the first element in entries.
|
||||||
type bucket struct {
|
type bucket struct{ entries []*Node }
|
||||||
lastLookup time.Time
|
|
||||||
entries []*Node
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
|
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
|
||||||
// If no node database was given, use an in-memory one
|
// If no node database was given, use an in-memory one
|
||||||
@ -93,11 +98,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
|
|||||||
db, _ = newNodeDB("", Version, ourID)
|
db, _ = newNodeDB("", Version, ourID)
|
||||||
}
|
}
|
||||||
tab := &Table{
|
tab := &Table{
|
||||||
net: t,
|
net: t,
|
||||||
db: db,
|
db: db,
|
||||||
self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
|
self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
|
||||||
bonding: make(map[NodeID]*bondproc),
|
bonding: make(map[NodeID]*bondproc),
|
||||||
bondslots: make(chan struct{}, maxBondingPingPongs),
|
bondslots: make(chan struct{}, maxBondingPingPongs),
|
||||||
|
refreshReq: make(chan struct{}),
|
||||||
|
closeReq: make(chan struct{}),
|
||||||
|
closed: make(chan struct{}),
|
||||||
}
|
}
|
||||||
for i := 0; i < cap(tab.bondslots); i++ {
|
for i := 0; i < cap(tab.bondslots); i++ {
|
||||||
tab.bondslots <- struct{}{}
|
tab.bondslots <- struct{}{}
|
||||||
@ -105,6 +113,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
|
|||||||
for i := range tab.buckets {
|
for i := range tab.buckets {
|
||||||
tab.buckets[i] = new(bucket)
|
tab.buckets[i] = new(bucket)
|
||||||
}
|
}
|
||||||
|
go tab.refreshLoop()
|
||||||
return tab
|
return tab
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,10 +172,12 @@ func randUint(max uint32) uint32 {
|
|||||||
|
|
||||||
// Close terminates the network listener and flushes the node database.
|
// Close terminates the network listener and flushes the node database.
|
||||||
func (tab *Table) Close() {
|
func (tab *Table) Close() {
|
||||||
if tab.net != nil {
|
select {
|
||||||
tab.net.close()
|
case <-tab.closed:
|
||||||
|
// already closed.
|
||||||
|
case tab.closeReq <- struct{}{}:
|
||||||
|
<-tab.closed // wait for refreshLoop to end.
|
||||||
}
|
}
|
||||||
tab.db.close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
|
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
|
||||||
@ -183,7 +194,7 @@ func (tab *Table) Bootstrap(nodes []*Node) {
|
|||||||
tab.nursery = append(tab.nursery, &cpy)
|
tab.nursery = append(tab.nursery, &cpy)
|
||||||
}
|
}
|
||||||
tab.mutex.Unlock()
|
tab.mutex.Unlock()
|
||||||
tab.refresh()
|
tab.requestRefresh()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lookup performs a network search for nodes close
|
// Lookup performs a network search for nodes close
|
||||||
@ -204,15 +215,13 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
|
|||||||
asked[tab.self.ID] = true
|
asked[tab.self.ID] = true
|
||||||
|
|
||||||
tab.mutex.Lock()
|
tab.mutex.Lock()
|
||||||
// update last lookup stamp (for refresh logic)
|
|
||||||
tab.buckets[logdist(tab.self.sha, target)].lastLookup = time.Now()
|
|
||||||
// generate initial result set
|
// generate initial result set
|
||||||
result := tab.closest(target, bucketSize)
|
result := tab.closest(target, bucketSize)
|
||||||
tab.mutex.Unlock()
|
tab.mutex.Unlock()
|
||||||
|
|
||||||
// If the result set is empty, all nodes were dropped, refresh
|
// If the result set is empty, all nodes were dropped, refresh.
|
||||||
if len(result.entries) == 0 {
|
if len(result.entries) == 0 {
|
||||||
tab.refresh()
|
tab.requestRefresh()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -257,56 +266,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
|
|||||||
return result.entries
|
return result.entries
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh performs a lookup for a random target to keep buckets full, or seeds
|
func (tab *Table) requestRefresh() {
|
||||||
// the table if it is empty (initial bootstrap or discarded faulty peers).
|
select {
|
||||||
func (tab *Table) refresh() {
|
case tab.refreshReq <- struct{}{}:
|
||||||
seed := true
|
case <-tab.closed:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If the discovery table is empty, seed with previously known nodes
|
func (tab *Table) refreshLoop() {
|
||||||
tab.mutex.Lock()
|
defer func() {
|
||||||
for _, bucket := range tab.buckets {
|
tab.db.close()
|
||||||
if len(bucket.entries) > 0 {
|
if tab.net != nil {
|
||||||
seed = false
|
tab.net.close()
|
||||||
break
|
}
|
||||||
|
close(tab.closed)
|
||||||
|
}()
|
||||||
|
|
||||||
|
timer := time.NewTicker(autoRefreshInterval)
|
||||||
|
var done chan struct{}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
if done == nil {
|
||||||
|
done = make(chan struct{})
|
||||||
|
go tab.doRefresh(done)
|
||||||
|
}
|
||||||
|
case <-tab.refreshReq:
|
||||||
|
if done == nil {
|
||||||
|
done = make(chan struct{})
|
||||||
|
go tab.doRefresh(done)
|
||||||
|
}
|
||||||
|
case <-done:
|
||||||
|
done = nil
|
||||||
|
case <-tab.closeReq:
|
||||||
|
if done != nil {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// doRefresh performs a lookup for a random target to keep buckets
|
||||||
|
// full. seed nodes are inserted if the table is empty (initial
|
||||||
|
// bootstrap or discarded faulty peers).
|
||||||
|
func (tab *Table) doRefresh(done chan struct{}) {
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
// The Kademlia paper specifies that the bucket refresh should
|
||||||
|
// perform a lookup in the least recently used bucket. We cannot
|
||||||
|
// adhere to this because the findnode target is a 512bit value
|
||||||
|
// (not hash-sized) and it is not easily possible to generate a
|
||||||
|
// sha3 preimage that falls into a chosen bucket.
|
||||||
|
// We perform a lookup with a random target instead.
|
||||||
|
var target NodeID
|
||||||
|
rand.Read(target[:])
|
||||||
|
result := tab.Lookup(target)
|
||||||
|
if len(result) > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The table is empty. Load nodes from the database and insert
|
||||||
|
// them. This should yield a few previously seen nodes that are
|
||||||
|
// (hopefully) still alive.
|
||||||
|
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
|
||||||
|
seeds = tab.bondall(append(seeds, tab.nursery...))
|
||||||
|
if glog.V(logger.Debug) {
|
||||||
|
if len(seeds) == 0 {
|
||||||
|
glog.Infof("no seed nodes found")
|
||||||
|
}
|
||||||
|
for _, n := range seeds {
|
||||||
|
age := time.Since(tab.db.lastPong(n.ID))
|
||||||
|
glog.Infof("seed node (age %v): %v", age, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tab.mutex.Lock()
|
||||||
|
tab.stuff(seeds)
|
||||||
tab.mutex.Unlock()
|
tab.mutex.Unlock()
|
||||||
|
|
||||||
// If the table is not empty, try to refresh using the live entries
|
// Finally, do a self lookup to fill up the buckets.
|
||||||
if !seed {
|
tab.Lookup(tab.self.ID)
|
||||||
// The Kademlia paper specifies that the bucket refresh should
|
|
||||||
// perform a refresh in the least recently used bucket. We cannot
|
|
||||||
// adhere to this because the findnode target is a 512bit value
|
|
||||||
// (not hash-sized) and it is not easily possible to generate a
|
|
||||||
// sha3 preimage that falls into a chosen bucket.
|
|
||||||
//
|
|
||||||
// We perform a lookup with a random target instead.
|
|
||||||
var target NodeID
|
|
||||||
rand.Read(target[:])
|
|
||||||
|
|
||||||
result := tab.Lookup(target)
|
|
||||||
if len(result) == 0 {
|
|
||||||
// Lookup failed, seed after all
|
|
||||||
seed = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if seed {
|
|
||||||
// Pick a batch of previously know seeds to lookup with
|
|
||||||
seeds := tab.db.querySeeds(10)
|
|
||||||
for _, seed := range seeds {
|
|
||||||
glog.V(logger.Debug).Infoln("Seeding network with", seed)
|
|
||||||
}
|
|
||||||
nodes := append(tab.nursery, seeds...)
|
|
||||||
|
|
||||||
// Bond with all the seed nodes (will pingpong only if failed recently)
|
|
||||||
bonded := tab.bondall(nodes)
|
|
||||||
if len(bonded) > 0 {
|
|
||||||
tab.Lookup(tab.self.ID)
|
|
||||||
}
|
|
||||||
// TODO: the Kademlia paper says that we're supposed to perform
|
|
||||||
// random lookups in all buckets further away than our closest neighbor.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// closest returns the n nodes in the table that are closest to the
|
// closest returns the n nodes in the table that are closest to the
|
||||||
@ -373,8 +412,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
|
|||||||
}
|
}
|
||||||
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
|
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
|
||||||
var result error
|
var result error
|
||||||
if node == nil || fails > 0 {
|
age := time.Since(tab.db.lastPong(id))
|
||||||
glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails)
|
if node == nil || fails > 0 || age > nodeDBNodeExpiration {
|
||||||
|
glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)
|
||||||
|
|
||||||
tab.bondmu.Lock()
|
tab.bondmu.Lock()
|
||||||
w := tab.bonding[id]
|
w := tab.bonding[id]
|
||||||
@ -435,13 +475,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
|
|||||||
// ping a remote endpoint and wait for a reply, also updating the node
|
// ping a remote endpoint and wait for a reply, also updating the node
|
||||||
// database accordingly.
|
// database accordingly.
|
||||||
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
|
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
|
||||||
// Update the last ping and send the message
|
|
||||||
tab.db.updateLastPing(id, time.Now())
|
tab.db.updateLastPing(id, time.Now())
|
||||||
if err := tab.net.ping(id, addr); err != nil {
|
if err := tab.net.ping(id, addr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Pong received, update the database and return
|
|
||||||
tab.db.updateLastPong(id, time.Now())
|
tab.db.updateLastPong(id, time.Now())
|
||||||
|
|
||||||
|
// Start the background expiration goroutine after the first
|
||||||
|
// successful communication. Subsequent calls have no effect if it
|
||||||
|
// is already running. We do this here instead of somewhere else
|
||||||
|
// so that the search for seed nodes also considers older nodes
|
||||||
|
// that would otherwise be removed by the expiration.
|
||||||
tab.db.ensureExpirer()
|
tab.db.ensureExpirer()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -514,9 +514,6 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod
|
|||||||
if toaddr.Port == 0 {
|
if toaddr.Port == 0 {
|
||||||
panic("query to node at distance 0")
|
panic("query to node at distance 0")
|
||||||
}
|
}
|
||||||
if target != tn.target {
|
|
||||||
panic("findnode with wrong target")
|
|
||||||
}
|
|
||||||
next := uint16(toaddr.Port) - 1
|
next := uint16(toaddr.Port) - 1
|
||||||
var result []*Node
|
var result []*Node
|
||||||
for i, id := range tn.dists[toaddr.Port] {
|
for i, id := range tn.dists[toaddr.Port] {
|
||||||
|
@ -39,7 +39,6 @@ var (
|
|||||||
errPacketTooSmall = errors.New("too small")
|
errPacketTooSmall = errors.New("too small")
|
||||||
errBadHash = errors.New("bad hash")
|
errBadHash = errors.New("bad hash")
|
||||||
errExpired = errors.New("expired")
|
errExpired = errors.New("expired")
|
||||||
errBadVersion = errors.New("version mismatch")
|
|
||||||
errUnsolicitedReply = errors.New("unsolicited reply")
|
errUnsolicitedReply = errors.New("unsolicited reply")
|
||||||
errUnknownNode = errors.New("unknown node")
|
errUnknownNode = errors.New("unknown node")
|
||||||
errTimeout = errors.New("RPC timeout")
|
errTimeout = errors.New("RPC timeout")
|
||||||
@ -52,8 +51,6 @@ const (
|
|||||||
respTimeout = 500 * time.Millisecond
|
respTimeout = 500 * time.Millisecond
|
||||||
sendTimeout = 500 * time.Millisecond
|
sendTimeout = 500 * time.Millisecond
|
||||||
expiration = 20 * time.Second
|
expiration = 20 * time.Second
|
||||||
|
|
||||||
refreshInterval = 1 * time.Hour
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RPC packet types
|
// RPC packet types
|
||||||
@ -312,10 +309,8 @@ func (t *udp) loop() {
|
|||||||
plist = list.New()
|
plist = list.New()
|
||||||
timeout = time.NewTimer(0)
|
timeout = time.NewTimer(0)
|
||||||
nextTimeout *pending // head of plist when timeout was last reset
|
nextTimeout *pending // head of plist when timeout was last reset
|
||||||
refresh = time.NewTicker(refreshInterval)
|
|
||||||
)
|
)
|
||||||
<-timeout.C // ignore first timeout
|
<-timeout.C // ignore first timeout
|
||||||
defer refresh.Stop()
|
|
||||||
defer timeout.Stop()
|
defer timeout.Stop()
|
||||||
|
|
||||||
resetTimeout := func() {
|
resetTimeout := func() {
|
||||||
@ -344,9 +339,6 @@ func (t *udp) loop() {
|
|||||||
resetTimeout()
|
resetTimeout()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-refresh.C:
|
|
||||||
go t.refresh()
|
|
||||||
|
|
||||||
case <-t.closing:
|
case <-t.closing:
|
||||||
for el := plist.Front(); el != nil; el = el.Next() {
|
for el := plist.Front(); el != nil; el = el.Next() {
|
||||||
el.Value.(*pending).errc <- errClosed
|
el.Value.(*pending).errc <- errClosed
|
||||||
@ -529,9 +521,6 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
|
|||||||
if expired(req.Expiration) {
|
if expired(req.Expiration) {
|
||||||
return errExpired
|
return errExpired
|
||||||
}
|
}
|
||||||
if req.Version != Version {
|
|
||||||
return errBadVersion
|
|
||||||
}
|
|
||||||
t.send(from, pongPacket, pong{
|
t.send(from, pongPacket, pong{
|
||||||
To: makeEndpoint(from, req.From.TCP),
|
To: makeEndpoint(from, req.From.TCP),
|
||||||
ReplyTok: mac,
|
ReplyTok: mac,
|
||||||
|
@ -122,7 +122,6 @@ func TestUDP_packetErrors(t *testing.T) {
|
|||||||
defer test.table.Close()
|
defer test.table.Close()
|
||||||
|
|
||||||
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: Version})
|
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: Version})
|
||||||
test.packetIn(errBadVersion, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 99, Expiration: futureExp})
|
|
||||||
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
|
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
|
||||||
test.packetIn(errUnknownNode, findnodePacket, &findnode{Expiration: futureExp})
|
test.packetIn(errUnknownNode, findnodePacket, &findnode{Expiration: futureExp})
|
||||||
test.packetIn(errUnsolicitedReply, neighborsPacket, &neighbors{Expiration: futureExp})
|
test.packetIn(errUnsolicitedReply, neighborsPacket, &neighbors{Expiration: futureExp})
|
||||||
|
Loading…
Reference in New Issue
Block a user