p2p/discover: unlock the table during ping replacement
Table.mutex was being held while waiting for a reply packet, which effectively made many parts of the whole stack block on that packet, including the net_peerCount RPC call.
This commit is contained in:
parent
698e98d981
commit
01ed3fa1a9
@ -78,9 +78,8 @@ type transport interface {
|
||||
close()
|
||||
}
|
||||
|
||||
// bucket contains nodes, ordered by their last activity.
|
||||
// the entry that was most recently active is the last element
|
||||
// in entries.
|
||||
// bucket contains nodes, ordered by their last activity. the entry
|
||||
// that was most recently active is the first element in entries.
|
||||
type bucket struct {
|
||||
lastLookup time.Time
|
||||
entries []*Node
|
||||
@ -235,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
|
||||
|
||||
if fails >= maxFindnodeFailures {
|
||||
glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
|
||||
tab.del(n)
|
||||
tab.delete(n)
|
||||
}
|
||||
}
|
||||
reply <- tab.bondall(r)
|
||||
@ -401,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
|
||||
node = w.n
|
||||
}
|
||||
}
|
||||
// Even if bonding temporarily failed, give the node a chance
|
||||
if node != nil {
|
||||
tab.mutex.Lock()
|
||||
defer tab.mutex.Unlock()
|
||||
|
||||
b := tab.buckets[logdist(tab.self.sha, node.sha)]
|
||||
if !b.bump(node) {
|
||||
tab.pingreplace(node, b)
|
||||
}
|
||||
// Add the node to the table even if the bonding ping/pong
|
||||
// fails. It will be relaced quickly if it continues to be
|
||||
// unresponsive.
|
||||
tab.add(node)
|
||||
tab.db.updateFindFails(id, 0)
|
||||
}
|
||||
return node, result
|
||||
@ -420,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
|
||||
<-tab.bondslots
|
||||
defer func() { tab.bondslots <- struct{}{} }()
|
||||
|
||||
// Ping the remote side and wait for a pong
|
||||
// Ping the remote side and wait for a pong.
|
||||
if w.err = tab.ping(id, addr); w.err != nil {
|
||||
close(w.done)
|
||||
return
|
||||
@ -431,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
|
||||
// waitping will simply time out.
|
||||
tab.net.waitping(id)
|
||||
}
|
||||
// Bonding succeeded, update the node database
|
||||
// Bonding succeeded, update the node database.
|
||||
w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort)
|
||||
tab.db.updateNode(w.n)
|
||||
close(w.done)
|
||||
}
|
||||
|
||||
func (tab *Table) pingreplace(new *Node, b *bucket) {
|
||||
if len(b.entries) == bucketSize {
|
||||
oldest := b.entries[bucketSize-1]
|
||||
if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
|
||||
// The node responded, we don't need to replace it.
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Add a slot at the end so the last entry doesn't
|
||||
// fall off when adding the new node.
|
||||
b.entries = append(b.entries, nil)
|
||||
}
|
||||
copy(b.entries[1:], b.entries)
|
||||
b.entries[0] = new
|
||||
if tab.nodeAddedHook != nil {
|
||||
tab.nodeAddedHook(new)
|
||||
}
|
||||
}
|
||||
|
||||
// ping a remote endpoint and wait for a reply, also updating the node database
|
||||
// accordingly.
|
||||
// ping a remote endpoint and wait for a reply, also updating the node
|
||||
// database accordingly.
|
||||
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
|
||||
// Update the last ping and send the message
|
||||
tab.db.updateLastPing(id, time.Now())
|
||||
@ -467,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
|
||||
// Pong received, update the database and return
|
||||
tab.db.updateLastPong(id, time.Now())
|
||||
tab.db.ensureExpirer()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// add puts the entries into the table if their corresponding
|
||||
// bucket is not full. The caller must hold tab.mutex.
|
||||
func (tab *Table) add(entries []*Node) {
|
||||
// add attempts to add the given node its corresponding bucket. If the
|
||||
// bucket has space available, adding the node succeeds immediately.
|
||||
// Otherwise, the node is added if the least recently active node in
|
||||
// the bucket does not respond to a ping packet.
|
||||
//
|
||||
// The caller must not hold tab.mutex.
|
||||
func (tab *Table) add(new *Node) {
|
||||
b := tab.buckets[logdist(tab.self.sha, new.sha)]
|
||||
tab.mutex.Lock()
|
||||
if b.bump(new) {
|
||||
tab.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
var oldest *Node
|
||||
if len(b.entries) == bucketSize {
|
||||
oldest = b.entries[bucketSize-1]
|
||||
// Let go of the mutex so other goroutines can access
|
||||
// the table while we ping the least recently active node.
|
||||
tab.mutex.Unlock()
|
||||
if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
|
||||
// The node responded, don't replace it.
|
||||
return
|
||||
}
|
||||
tab.mutex.Lock()
|
||||
}
|
||||
added := b.replace(new, oldest)
|
||||
tab.mutex.Unlock()
|
||||
if added && tab.nodeAddedHook != nil {
|
||||
tab.nodeAddedHook(new)
|
||||
}
|
||||
}
|
||||
|
||||
// stuff adds nodes the table to the end of their corresponding bucket
|
||||
// if the bucket is not full. The caller must hold tab.mutex.
|
||||
func (tab *Table) stuff(nodes []*Node) {
|
||||
outer:
|
||||
for _, n := range entries {
|
||||
for _, n := range nodes {
|
||||
if n.ID == tab.self.ID {
|
||||
// don't add self.
|
||||
continue
|
||||
continue // don't add self
|
||||
}
|
||||
bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
|
||||
for i := range bucket.entries {
|
||||
if bucket.entries[i].ID == n.ID {
|
||||
// already in bucket
|
||||
continue outer
|
||||
continue outer // already in bucket
|
||||
}
|
||||
}
|
||||
if len(bucket.entries) < bucketSize {
|
||||
@ -496,12 +501,11 @@ outer:
|
||||
}
|
||||
}
|
||||
|
||||
// del removes an entry from the node table (used to evacuate failed/non-bonded
|
||||
// discovery peers).
|
||||
func (tab *Table) del(node *Node) {
|
||||
// delete removes an entry from the node table (used to evacuate
|
||||
// failed/non-bonded discovery peers).
|
||||
func (tab *Table) delete(node *Node) {
|
||||
tab.mutex.Lock()
|
||||
defer tab.mutex.Unlock()
|
||||
|
||||
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
|
||||
for i := range bucket.entries {
|
||||
if bucket.entries[i].ID == node.ID {
|
||||
@ -511,6 +515,27 @@ func (tab *Table) del(node *Node) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bucket) replace(n *Node, last *Node) bool {
|
||||
// Don't add if b already contains n.
|
||||
for i := range b.entries {
|
||||
if b.entries[i].ID == n.ID {
|
||||
return false
|
||||
}
|
||||
}
|
||||
// Replace last if it is still the last entry or just add n if b
|
||||
// isn't full. If is no longer the last entry, it has either been
|
||||
// replaced with someone else or became active.
|
||||
if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) {
|
||||
return false
|
||||
}
|
||||
if len(b.entries) < bucketSize {
|
||||
b.entries = append(b.entries, nil)
|
||||
}
|
||||
copy(b.entries[1:], b.entries)
|
||||
b.entries[0] = n
|
||||
return true
|
||||
}
|
||||
|
||||
func (b *bucket) bump(n *Node) bool {
|
||||
for i := range b.entries {
|
||||
if b.entries[i].ID == n.ID {
|
||||
|
@ -178,8 +178,8 @@ func TestTable_closest(t *testing.T) {
|
||||
test := func(test *closeTest) bool {
|
||||
// for any node table, Target and N
|
||||
tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
|
||||
tab.add(test.All)
|
||||
defer tab.Close()
|
||||
tab.stuff(test.All)
|
||||
|
||||
// check that doClosest(Target, N) returns nodes
|
||||
result := tab.closest(test.Target, test.N).entries
|
||||
@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
|
||||
defer tab.Close()
|
||||
for i := 0; i < len(buf); i++ {
|
||||
ld := cfg.Rand.Intn(len(tab.buckets))
|
||||
tab.add([]*Node{nodeAtDistance(tab.self.sha, ld)})
|
||||
tab.stuff([]*Node{nodeAtDistance(tab.self.sha, ld)})
|
||||
}
|
||||
gotN := tab.ReadRandomNodes(buf)
|
||||
if gotN != tab.len() {
|
||||
@ -288,7 +288,7 @@ func TestTable_Lookup(t *testing.T) {
|
||||
}
|
||||
// seed table with initial node (otherwise lookup will terminate immediately)
|
||||
seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0)
|
||||
tab.add([]*Node{seed})
|
||||
tab.stuff([]*Node{seed})
|
||||
|
||||
results := tab.Lookup(lookupTestnet.target)
|
||||
t.Logf("results:")
|
||||
|
@ -167,7 +167,7 @@ func TestUDP_findnode(t *testing.T) {
|
||||
for i := 0; i < bucketSize; i++ {
|
||||
nodes.push(nodeAtDistance(test.table.self.sha, i+2), bucketSize)
|
||||
}
|
||||
test.table.add(nodes.entries)
|
||||
test.table.stuff(nodes.entries)
|
||||
|
||||
// ensure there's a bond with the test node,
|
||||
// findnode won't be accepted otherwise.
|
||||
|
Loading…
Reference in New Issue
Block a user