diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 67f7ec46f..b077f010c 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -78,9 +78,8 @@ type transport interface { close() } -// bucket contains nodes, ordered by their last activity. -// the entry that was most recently active is the last element -// in entries. +// bucket contains nodes, ordered by their last activity. the entry +// that was most recently active is the first element in entries. type bucket struct { lastLookup time.Time entries []*Node @@ -235,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { if fails >= maxFindnodeFailures { glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails) - tab.del(n) + tab.delete(n) } } reply <- tab.bondall(r) @@ -401,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 node = w.n } } - // Even if bonding temporarily failed, give the node a chance if node != nil { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - b := tab.buckets[logdist(tab.self.sha, node.sha)] - if !b.bump(node) { - tab.pingreplace(node, b) - } + // Add the node to the table even if the bonding ping/pong + // fails. It will be relaced quickly if it continues to be + // unresponsive. + tab.add(node) tab.db.updateFindFails(id, 0) } return node, result @@ -420,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd <-tab.bondslots defer func() { tab.bondslots <- struct{}{} }() - // Ping the remote side and wait for a pong + // Ping the remote side and wait for a pong. if w.err = tab.ping(id, addr); w.err != nil { close(w.done) return @@ -431,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - // Bonding succeeded, update the node database + // Bonding succeeded, update the node database. w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort) tab.db.updateNode(w.n) close(w.done) } -func (tab *Table) pingreplace(new *Node, b *bucket) { - if len(b.entries) == bucketSize { - oldest := b.entries[bucketSize-1] - if err := tab.ping(oldest.ID, oldest.addr()); err == nil { - // The node responded, we don't need to replace it. - return - } - } else { - // Add a slot at the end so the last entry doesn't - // fall off when adding the new node. - b.entries = append(b.entries, nil) - } - copy(b.entries[1:], b.entries) - b.entries[0] = new - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(new) - } -} - -// ping a remote endpoint and wait for a reply, also updating the node database -// accordingly. +// ping a remote endpoint and wait for a reply, also updating the node +// database accordingly. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { // Update the last ping and send the message tab.db.updateLastPing(id, time.Now()) @@ -467,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) tab.db.ensureExpirer() - return nil } -// add puts the entries into the table if their corresponding -// bucket is not full. The caller must hold tab.mutex. -func (tab *Table) add(entries []*Node) { +// add attempts to add the given node its corresponding bucket. If the +// bucket has space available, adding the node succeeds immediately. +// Otherwise, the node is added if the least recently active node in +// the bucket does not respond to a ping packet. +// +// The caller must not hold tab.mutex. +func (tab *Table) add(new *Node) { + b := tab.buckets[logdist(tab.self.sha, new.sha)] + tab.mutex.Lock() + if b.bump(new) { + tab.mutex.Unlock() + return + } + var oldest *Node + if len(b.entries) == bucketSize { + oldest = b.entries[bucketSize-1] + // Let go of the mutex so other goroutines can access + // the table while we ping the least recently active node. + tab.mutex.Unlock() + if err := tab.ping(oldest.ID, oldest.addr()); err == nil { + // The node responded, don't replace it. + return + } + tab.mutex.Lock() + } + added := b.replace(new, oldest) + tab.mutex.Unlock() + if added && tab.nodeAddedHook != nil { + tab.nodeAddedHook(new) + } +} + +// stuff adds nodes the table to the end of their corresponding bucket +// if the bucket is not full. The caller must hold tab.mutex. +func (tab *Table) stuff(nodes []*Node) { outer: - for _, n := range entries { + for _, n := range nodes { if n.ID == tab.self.ID { - // don't add self. - continue + continue // don't add self } bucket := tab.buckets[logdist(tab.self.sha, n.sha)] for i := range bucket.entries { if bucket.entries[i].ID == n.ID { - // already in bucket - continue outer + continue outer // already in bucket } } if len(bucket.entries) < bucketSize { @@ -496,12 +501,11 @@ outer: } } -// del removes an entry from the node table (used to evacuate failed/non-bonded -// discovery peers). -func (tab *Table) del(node *Node) { +// delete removes an entry from the node table (used to evacuate +// failed/non-bonded discovery peers). +func (tab *Table) delete(node *Node) { tab.mutex.Lock() defer tab.mutex.Unlock() - bucket := tab.buckets[logdist(tab.self.sha, node.sha)] for i := range bucket.entries { if bucket.entries[i].ID == node.ID { @@ -511,6 +515,27 @@ func (tab *Table) del(node *Node) { } } +func (b *bucket) replace(n *Node, last *Node) bool { + // Don't add if b already contains n. + for i := range b.entries { + if b.entries[i].ID == n.ID { + return false + } + } + // Replace last if it is still the last entry or just add n if b + // isn't full. If is no longer the last entry, it has either been + // replaced with someone else or became active. + if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) { + return false + } + if len(b.entries) < bucketSize { + b.entries = append(b.entries, nil) + } + copy(b.entries[1:], b.entries) + b.entries[0] = n + return true +} + func (b *bucket) bump(n *Node) bool { for i := range b.entries { if b.entries[i].ID == n.ID { diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index d259177bf..426f4e9cc 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -178,8 +178,8 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N tab := newTable(nil, test.Self, &net.UDPAddr{}, "") - tab.add(test.All) defer tab.Close() + tab.stuff(test.All) // check that doClosest(Target, N) returns nodes result := tab.closest(test.Target, test.N).entries @@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) { defer tab.Close() for i := 0; i < len(buf); i++ { ld := cfg.Rand.Intn(len(tab.buckets)) - tab.add([]*Node{nodeAtDistance(tab.self.sha, ld)}) + tab.stuff([]*Node{nodeAtDistance(tab.self.sha, ld)}) } gotN := tab.ReadRandomNodes(buf) if gotN != tab.len() { @@ -288,7 +288,7 @@ func TestTable_Lookup(t *testing.T) { } // seed table with initial node (otherwise lookup will terminate immediately) seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0) - tab.add([]*Node{seed}) + tab.stuff([]*Node{seed}) results := tab.Lookup(lookupTestnet.target) t.Logf("results:") diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d7ca9000d..008e63937 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -18,6 +18,7 @@ package discover import ( "bytes" + "container/list" "crypto/ecdsa" "errors" "fmt" @@ -43,6 +44,7 @@ var ( errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") errTimeout = errors.New("RPC timeout") + errClockWarp = errors.New("reply deadline too far in the future") errClosed = errors.New("socket closed") ) @@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- } func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { - matched := make(chan bool) + matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: // loop will handle it @@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { // the refresh timer and the pending reply queue. func (t *udp) loop() { var ( - pending []*pending - nextDeadline time.Time - timeout = time.NewTimer(0) - refresh = time.NewTicker(refreshInterval) + plist = list.New() + timeout = time.NewTimer(0) + nextTimeout *pending // head of plist when timeout was last reset + refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout defer refresh.Stop() defer timeout.Stop() - rearmTimeout := func() { - now := time.Now() - if len(pending) == 0 || now.Before(nextDeadline) { + resetTimeout := func() { + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(now)) + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*pending) + if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + nextTimeout = nil + timeout.Stop() } for { + resetTimeout() + select { case <-refresh.C: go t.refresh() case <-t.closing: - for _, p := range pending { - p.errc <- errClosed + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*pending).errc <- errClosed } - pending = nil return case p := <-t.addpending: p.deadline = time.Now().Add(respTimeout) - pending = append(pending, p) - rearmTimeout() + plist.PushBack(p) case r := <-t.gotreply: var matched bool - for i := 0; i < len(pending); i++ { - if p := pending[i]; p.from == r.from && p.ptype == r.ptype { + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if p.from == r.from && p.ptype == r.ptype { matched = true + // Remove the matcher if its callback indicates + // that all replies have been received. This is + // required for packet types that expect multiple + // reply packets. if p.callback(r.data) { - // callback indicates the request is done, remove it. p.errc <- nil - copy(pending[i:], pending[i+1:]) - pending = pending[:len(pending)-1] - i-- + plist.Remove(el) } } } r.matched <- matched case now := <-timeout.C: - // notify and remove callbacks whose deadline is in the past. - i := 0 - for ; i < len(pending) && now.After(pending[i].deadline); i++ { - pending[i].errc <- errTimeout + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if now.After(p.deadline) || now.Equal(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + } } - if i > 0 { - copy(pending, pending[i:]) - pending = pending[:len(pending)-i] - } - rearmTimeout() } } } @@ -385,7 +401,7 @@ const ( var ( headSpace = make([]byte, headSize) - // Neighbors responses are sent across multiple packets to + // Neighbors replies are sent across multiple packets to // stay below the 1280 byte limit. We compute the maximum number // of entries by stuffing a packet until it grows too large. maxNeighbors int diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 8d6d3e855..a86d3737b 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -19,10 +19,12 @@ package discover import ( "bytes" "crypto/ecdsa" + "encoding/binary" "errors" "fmt" "io" logpkg "log" + "math/rand" "net" "os" "path/filepath" @@ -138,6 +140,77 @@ func TestUDP_pingTimeout(t *testing.T) { } } +func TestUDP_responseTimeouts(t *testing.T) { + t.Parallel() + test := newUDPTest(t) + defer test.table.Close() + + rand.Seed(time.Now().UnixNano()) + randomDuration := func(max time.Duration) time.Duration { + return time.Duration(rand.Int63n(int64(max))) + } + + var ( + nReqs = 200 + nTimeouts = 0 // number of requests with ptype > 128 + nilErr = make(chan error, nReqs) // for requests that get a reply + timeoutErr = make(chan error, nReqs) // for requests that time out + ) + for i := 0; i < nReqs; i++ { + // Create a matcher for a random request in udp.loop. Requests + // with ptype <= 128 will not get a reply and should time out. + // For all other requests, a reply is scheduled to arrive + // within the timeout window. + p := &pending{ + ptype: byte(rand.Intn(255)), + callback: func(interface{}) bool { return true }, + } + binary.BigEndian.PutUint64(p.from[:], uint64(i)) + if p.ptype <= 128 { + p.errc = timeoutErr + nTimeouts++ + } else { + p.errc = nilErr + time.AfterFunc(randomDuration(60*time.Millisecond), func() { + if !test.udp.handleReply(p.from, p.ptype, nil) { + t.Logf("not matched: %v", p) + } + }) + } + test.udp.addpending <- p + time.Sleep(randomDuration(30 * time.Millisecond)) + } + + // Check that all timeouts were delivered and that the rest got nil errors. + // The replies must be delivered. + var ( + recvDeadline = time.After(20 * time.Second) + nTimeoutsRecv, nNil = 0, 0 + ) + for i := 0; i < nReqs; i++ { + select { + case err := <-timeoutErr: + if err != errTimeout { + t.Fatalf("got non-timeout error on timeoutErr %d: %v", i, err) + } + nTimeoutsRecv++ + case err := <-nilErr: + if err != nil { + t.Fatalf("got non-nil error on nilErr %d: %v", i, err) + } + nNil++ + case <-recvDeadline: + t.Fatalf("exceeded recv deadline") + } + } + if nTimeoutsRecv != nTimeouts { + t.Errorf("wrong number of timeout errors received: got %d, want %d", nTimeoutsRecv, nTimeouts) + } + if nNil != nReqs-nTimeouts { + t.Errorf("wrong number of successful replies: got %d, want %d", nNil, nReqs-nTimeouts) + } +} + func TestUDP_findnodeTimeout(t *testing.T) { t.Parallel() test := newUDPTest(t) @@ -167,7 +240,7 @@ func TestUDP_findnode(t *testing.T) { for i := 0; i < bucketSize; i++ { nodes.push(nodeAtDistance(test.table.self.sha, i+2), bucketSize) } - test.table.add(nodes.entries) + test.table.stuff(nodes.entries) // ensure there's a bond with the test node, // findnode won't be accepted otherwise.