forked from cerc-io/plugeth
c155c8e179
This improves the speed of DHT crawling by using concurrent requests. It also removes logging of individual DNS updates.
219 lines
5.0 KiB
Go
219 lines
5.0 KiB
Go
// Copyright 2019 The go-ethereum Authors
|
|
// This file is part of go-ethereum.
|
|
//
|
|
// go-ethereum is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// go-ethereum is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package main
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
)
|
|
|
|
type crawler struct {
|
|
input nodeSet
|
|
output nodeSet
|
|
disc resolver
|
|
iters []enode.Iterator
|
|
inputIter enode.Iterator
|
|
ch chan *enode.Node
|
|
closed chan struct{}
|
|
|
|
// settings
|
|
revalidateInterval time.Duration
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
const (
|
|
nodeRemoved = iota
|
|
nodeSkipRecent
|
|
nodeSkipIncompat
|
|
nodeAdded
|
|
nodeUpdated
|
|
)
|
|
|
|
type resolver interface {
|
|
RequestENR(*enode.Node) (*enode.Node, error)
|
|
}
|
|
|
|
func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler {
|
|
c := &crawler{
|
|
input: input,
|
|
output: make(nodeSet, len(input)),
|
|
disc: disc,
|
|
iters: iters,
|
|
inputIter: enode.IterNodes(input.nodes()),
|
|
ch: make(chan *enode.Node),
|
|
closed: make(chan struct{}),
|
|
}
|
|
c.iters = append(c.iters, c.inputIter)
|
|
// Copy input to output initially. Any nodes that fail validation
|
|
// will be dropped from output during the run.
|
|
for id, n := range input {
|
|
c.output[id] = n
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet {
|
|
var (
|
|
timeoutTimer = time.NewTimer(timeout)
|
|
timeoutCh <-chan time.Time
|
|
statusTicker = time.NewTicker(time.Second * 8)
|
|
doneCh = make(chan enode.Iterator, len(c.iters))
|
|
liveIters = len(c.iters)
|
|
)
|
|
if nthreads < 1 {
|
|
nthreads = 1
|
|
}
|
|
defer timeoutTimer.Stop()
|
|
defer statusTicker.Stop()
|
|
for _, it := range c.iters {
|
|
go c.runIterator(doneCh, it)
|
|
}
|
|
var (
|
|
added uint64
|
|
updated uint64
|
|
skipped uint64
|
|
recent uint64
|
|
removed uint64
|
|
wg sync.WaitGroup
|
|
)
|
|
wg.Add(nthreads)
|
|
for i := 0; i < nthreads; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case n := <-c.ch:
|
|
switch c.updateNode(n) {
|
|
case nodeSkipIncompat:
|
|
atomic.AddUint64(&skipped, 1)
|
|
case nodeSkipRecent:
|
|
atomic.AddUint64(&recent, 1)
|
|
case nodeRemoved:
|
|
atomic.AddUint64(&removed, 1)
|
|
case nodeAdded:
|
|
atomic.AddUint64(&added, 1)
|
|
default:
|
|
atomic.AddUint64(&updated, 1)
|
|
}
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case it := <-doneCh:
|
|
if it == c.inputIter {
|
|
// Enable timeout when we're done revalidating the input nodes.
|
|
log.Info("Revalidation of input set is done", "len", len(c.input))
|
|
if timeout > 0 {
|
|
timeoutCh = timeoutTimer.C
|
|
}
|
|
}
|
|
if liveIters--; liveIters == 0 {
|
|
break loop
|
|
}
|
|
case <-timeoutCh:
|
|
break loop
|
|
case <-statusTicker.C:
|
|
log.Info("Crawling in progress",
|
|
"added", atomic.LoadUint64(&added),
|
|
"updated", atomic.LoadUint64(&updated),
|
|
"removed", atomic.LoadUint64(&removed),
|
|
"ignored(recent)", atomic.LoadUint64(&removed),
|
|
"ignored(incompatible)", atomic.LoadUint64(&skipped))
|
|
}
|
|
}
|
|
|
|
close(c.closed)
|
|
for _, it := range c.iters {
|
|
it.Close()
|
|
}
|
|
for ; liveIters > 0; liveIters-- {
|
|
<-doneCh
|
|
}
|
|
wg.Wait()
|
|
return c.output
|
|
}
|
|
|
|
func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
|
|
defer func() { done <- it }()
|
|
for it.Next() {
|
|
select {
|
|
case c.ch <- it.Node():
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// updateNode updates the info about the given node, and returns a status
|
|
// about what changed
|
|
func (c *crawler) updateNode(n *enode.Node) int {
|
|
c.mu.RLock()
|
|
node, ok := c.output[n.ID()]
|
|
c.mu.RUnlock()
|
|
|
|
// Skip validation of recently-seen nodes.
|
|
if ok && time.Since(node.LastCheck) < c.revalidateInterval {
|
|
return nodeSkipRecent
|
|
}
|
|
|
|
// Request the node record.
|
|
status := nodeUpdated
|
|
node.LastCheck = truncNow()
|
|
if nn, err := c.disc.RequestENR(n); err != nil {
|
|
if node.Score == 0 {
|
|
// Node doesn't implement EIP-868.
|
|
log.Debug("Skipping node", "id", n.ID())
|
|
return nodeSkipIncompat
|
|
}
|
|
node.Score /= 2
|
|
} else {
|
|
node.N = nn
|
|
node.Seq = nn.Seq()
|
|
node.Score++
|
|
if node.FirstResponse.IsZero() {
|
|
node.FirstResponse = node.LastCheck
|
|
status = nodeAdded
|
|
}
|
|
node.LastResponse = node.LastCheck
|
|
}
|
|
// Store/update node in output set.
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if node.Score <= 0 {
|
|
log.Debug("Removing node", "id", n.ID())
|
|
delete(c.output, n.ID())
|
|
return nodeRemoved
|
|
}
|
|
log.Debug("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
|
|
c.output[n.ID()] = node
|
|
return status
|
|
}
|
|
|
|
func truncNow() time.Time {
|
|
return time.Now().UTC().Truncate(1 * time.Second)
|
|
}
|