cmd/devp2p: faster crawling + less verbose dns updates (#26697)

This improves the speed of DHT crawling by using concurrent requests.
It also removes logging of individual DNS updates.
This commit is contained in:
Martin Holst Swende 2023-02-27 05:36:26 -05:00 committed by GitHub
parent ee530c0d5a
commit c155c8e179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 36 deletions

View File

@ -17,6 +17,8 @@
package main package main
import ( import (
"sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -34,6 +36,7 @@ type crawler struct {
// settings // settings
revalidateInterval time.Duration revalidateInterval time.Duration
mu sync.RWMutex
} }
const ( const (
@ -67,7 +70,7 @@ func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler
return c return c
} }
func (c *crawler) run(timeout time.Duration) nodeSet { func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet {
var ( var (
timeoutTimer = time.NewTimer(timeout) timeoutTimer = time.NewTimer(timeout)
timeoutCh <-chan time.Time timeoutCh <-chan time.Time
@ -75,35 +78,51 @@ func (c *crawler) run(timeout time.Duration) nodeSet {
doneCh = make(chan enode.Iterator, len(c.iters)) doneCh = make(chan enode.Iterator, len(c.iters))
liveIters = len(c.iters) liveIters = len(c.iters)
) )
if nthreads < 1 {
nthreads = 1
}
defer timeoutTimer.Stop() defer timeoutTimer.Stop()
defer statusTicker.Stop() defer statusTicker.Stop()
for _, it := range c.iters { for _, it := range c.iters {
go c.runIterator(doneCh, it) go c.runIterator(doneCh, it)
} }
var ( var (
added int added uint64
updated int updated uint64
skipped int skipped uint64
recent int recent uint64
removed int removed uint64
wg sync.WaitGroup
) )
loop: wg.Add(nthreads)
for i := 0; i < nthreads; i++ {
go func() {
defer wg.Done()
for { for {
select { select {
case n := <-c.ch: case n := <-c.ch:
switch c.updateNode(n) { switch c.updateNode(n) {
case nodeSkipIncompat: case nodeSkipIncompat:
skipped++ atomic.AddUint64(&skipped, 1)
case nodeSkipRecent: case nodeSkipRecent:
recent++ atomic.AddUint64(&recent, 1)
case nodeRemoved: case nodeRemoved:
removed++ atomic.AddUint64(&removed, 1)
case nodeAdded: case nodeAdded:
added++ atomic.AddUint64(&added, 1)
default: default:
updated++ atomic.AddUint64(&updated, 1)
} }
case <-c.closed:
return
}
}
}()
}
loop:
for {
select {
case it := <-doneCh: case it := <-doneCh:
if it == c.inputIter { if it == c.inputIter {
// Enable timeout when we're done revalidating the input nodes. // Enable timeout when we're done revalidating the input nodes.
@ -119,8 +138,11 @@ loop:
break loop break loop
case <-statusTicker.C: case <-statusTicker.C:
log.Info("Crawling in progress", log.Info("Crawling in progress",
"added", added, "updated", updated, "removed", removed, "added", atomic.LoadUint64(&added),
"ignored(recent)", recent, "ignored(incompatible)", skipped) "updated", atomic.LoadUint64(&updated),
"removed", atomic.LoadUint64(&removed),
"ignored(recent)", atomic.LoadUint64(&removed),
"ignored(incompatible)", atomic.LoadUint64(&skipped))
} }
} }
@ -131,6 +153,7 @@ loop:
for ; liveIters > 0; liveIters-- { for ; liveIters > 0; liveIters-- {
<-doneCh <-doneCh
} }
wg.Wait()
return c.output return c.output
} }
@ -148,7 +171,9 @@ func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
// updateNode updates the info about the given node, and returns a status // updateNode updates the info about the given node, and returns a status
// about what changed // about what changed
func (c *crawler) updateNode(n *enode.Node) int { func (c *crawler) updateNode(n *enode.Node) int {
c.mu.RLock()
node, ok := c.output[n.ID()] node, ok := c.output[n.ID()]
c.mu.RUnlock()
// Skip validation of recently-seen nodes. // Skip validation of recently-seen nodes.
if ok && time.Since(node.LastCheck) < c.revalidateInterval { if ok && time.Since(node.LastCheck) < c.revalidateInterval {
@ -156,10 +181,9 @@ func (c *crawler) updateNode(n *enode.Node) int {
} }
// Request the node record. // Request the node record.
nn, err := c.disc.RequestENR(n)
node.LastCheck = truncNow()
status := nodeUpdated status := nodeUpdated
if err != nil { node.LastCheck = truncNow()
if nn, err := c.disc.RequestENR(n); err != nil {
if node.Score == 0 { if node.Score == 0 {
// Node doesn't implement EIP-868. // Node doesn't implement EIP-868.
log.Debug("Skipping node", "id", n.ID()) log.Debug("Skipping node", "id", n.ID())
@ -176,8 +200,9 @@ func (c *crawler) updateNode(n *enode.Node) int {
} }
node.LastResponse = node.LastCheck node.LastResponse = node.LastCheck
} }
// Store/update node in output set. // Store/update node in output set.
c.mu.Lock()
defer c.mu.Unlock()
if node.Score <= 0 { if node.Score <= 0 {
log.Debug("Removing node", "id", n.ID()) log.Debug("Removing node", "id", n.ID())
delete(c.output, n.ID()) delete(c.output, n.ID())

View File

@ -78,7 +78,7 @@ var (
Name: "crawl", Name: "crawl",
Usage: "Updates a nodes.json file with random nodes found in the DHT", Usage: "Updates a nodes.json file with random nodes found in the DHT",
Action: discv4Crawl, Action: discv4Crawl,
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag}), Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}),
} }
discv4TestCommand = &cli.Command{ discv4TestCommand = &cli.Command{
Name: "test", Name: "test",
@ -120,6 +120,11 @@ var (
Usage: "Time limit for the crawl.", Usage: "Time limit for the crawl.",
Value: 30 * time.Minute, Value: 30 * time.Minute,
} }
crawlParallelismFlag = &cli.IntFlag{
Name: "parallel",
Usage: "How many parallel discoveries to attempt.",
Value: 16,
}
remoteEnodeFlag = &cli.StringFlag{ remoteEnodeFlag = &cli.StringFlag{
Name: "remote", Name: "remote",
Usage: "Enode of the remote node under test", Usage: "Enode of the remote node under test",
@ -195,7 +200,7 @@ func discv4ResolveJSON(ctx *cli.Context) error {
defer disc.Close() defer disc.Close()
c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs)) c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs))
c.revalidateInterval = 0 c.revalidateInterval = 0
output := c.run(0) output := c.run(0, 1)
writeNodesJSON(nodesFile, output) writeNodesJSON(nodesFile, output)
return nil return nil
} }
@ -214,7 +219,7 @@ func discv4Crawl(ctx *cli.Context) error {
defer disc.Close() defer disc.Close()
c := newCrawler(inputSet, disc, disc.RandomNodes()) c := newCrawler(inputSet, disc, disc.RandomNodes())
c.revalidateInterval = 10 * time.Minute c.revalidateInterval = 10 * time.Minute
output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name))
writeNodesJSON(nodesFile, output) writeNodesJSON(nodesFile, output)
return nil return nil
} }

View File

@ -110,7 +110,7 @@ func discv5Crawl(ctx *cli.Context) error {
defer disc.Close() defer disc.Close()
c := newCrawler(inputSet, disc, disc.RandomNodes()) c := newCrawler(inputSet, disc, disc.RandomNodes())
c.revalidateInterval = 10 * time.Minute c.revalidateInterval = 10 * time.Minute
output := c.run(ctx.Duration(crawlTimeoutFlag.Name)) output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name))
writeNodesJSON(nodesFile, output) writeNodesJSON(nodesFile, output)
return nil return nil
} }

View File

@ -126,11 +126,16 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string)
} }
// Iterate over the new records and inject anything missing. // Iterate over the new records and inject anything missing.
log.Info("Updating DNS entries")
created := 0
updated := 0
skipped := 0
for path, val := range records { for path, val := range records {
old, exists := existing[path] old, exists := existing[path]
if !exists { if !exists {
// Entry is unknown, push a new one to Cloudflare. // Entry is unknown, push a new one to Cloudflare.
log.Info(fmt.Sprintf("Creating %s = %q", path, val)) log.Debug(fmt.Sprintf("Creating %s = %q", path, val))
created++
ttl := rootTTL ttl := rootTTL
if path != name { if path != name {
ttl = treeNodeTTLCloudflare // Max TTL permitted by Cloudflare ttl = treeNodeTTLCloudflare // Max TTL permitted by Cloudflare
@ -139,27 +144,33 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string)
_, err = c.CreateDNSRecord(context.Background(), c.zoneID, record) _, err = c.CreateDNSRecord(context.Background(), c.zoneID, record)
} else if old.Content != val { } else if old.Content != val {
// Entry already exists, only change its content. // Entry already exists, only change its content.
log.Info(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val)) log.Debug(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val))
updated++
old.Content = val old.Content = val
err = c.UpdateDNSRecord(context.Background(), c.zoneID, old.ID, old) err = c.UpdateDNSRecord(context.Background(), c.zoneID, old.ID, old)
} else { } else {
skipped++
log.Debug(fmt.Sprintf("Skipping %s = %q", path, val)) log.Debug(fmt.Sprintf("Skipping %s = %q", path, val))
} }
if err != nil { if err != nil {
return fmt.Errorf("failed to publish %s: %v", path, err) return fmt.Errorf("failed to publish %s: %v", path, err)
} }
} }
log.Info("Updated DNS entries", "new", created, "updated", updated, "untouched", skipped)
// Iterate over the old records and delete anything stale. // Iterate over the old records and delete anything stale.
deleted := 0
log.Info("Deleting stale DNS entries")
for path, entry := range existing { for path, entry := range existing {
if _, ok := records[path]; ok { if _, ok := records[path]; ok {
continue continue
} }
// Stale entry, nuke it. // Stale entry, nuke it.
log.Info(fmt.Sprintf("Deleting %s = %q", path, entry.Content)) log.Debug(fmt.Sprintf("Deleting %s = %q", path, entry.Content))
deleted++
if err := c.DeleteDNSRecord(context.Background(), c.zoneID, entry.ID); err != nil { if err := c.DeleteDNSRecord(context.Background(), c.zoneID, entry.ID); err != nil {
return fmt.Errorf("failed to delete %s: %v", path, err) return fmt.Errorf("failed to delete %s: %v", path, err)
} }
} }
log.Info("Deleted stale DNS entries", "count", deleted)
return nil return nil
} }

View File

@ -329,8 +329,9 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error
var req route53.ListResourceRecordSetsInput var req route53.ListResourceRecordSetsInput
req.HostedZoneId = &c.zoneID req.HostedZoneId = &c.zoneID
existing := make(map[string]recordSet) existing := make(map[string]recordSet)
log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID)
for page := 0; ; page++ { for page := 0; ; page++ {
log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page) log.Debug("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page)
resp, err := c.api.ListResourceRecordSets(context.TODO(), &req) resp, err := c.api.ListResourceRecordSets(context.TODO(), &req)
if err != nil { if err != nil {
return existing, err return existing, err
@ -360,7 +361,7 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error
req.StartRecordName = resp.NextRecordName req.StartRecordName = resp.NextRecordName
req.StartRecordType = resp.NextRecordType req.StartRecordType = resp.NextRecordType
} }
log.Info("Loaded existing TXT records", "name", name, "zone", c.zoneID, "records", len(existing))
return existing, nil return existing, nil
} }