Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
Showing only changes of commit 1c5fa40a78 - Show all commits

View File

@ -36,6 +36,14 @@ type crawler struct {
revalidateInterval time.Duration
}
const (
nodeRemoved = iota
nodeSkipRecent
nodeSkipIncompat
nodeAdded
nodeUpdated
)
type resolver interface {
RequestENR(*enode.Node) (*enode.Node, error)
}
@ -63,19 +71,39 @@ func (c *crawler) run(timeout time.Duration) nodeSet {
var (
timeoutTimer = time.NewTimer(timeout)
timeoutCh <-chan time.Time
statusTicker = time.NewTicker(time.Second * 8)
doneCh = make(chan enode.Iterator, len(c.iters))
liveIters = len(c.iters)
)
defer timeoutTimer.Stop()
defer statusTicker.Stop()
for _, it := range c.iters {
go c.runIterator(doneCh, it)
}
var (
added int
updated int
skipped int
recent int
removed int
)
loop:
for {
select {
case n := <-c.ch:
c.updateNode(n)
switch c.updateNode(n) {
case nodeSkipIncompat:
skipped++
case nodeSkipRecent:
recent++
case nodeRemoved:
removed++
case nodeAdded:
added++
default:
updated++
}
case it := <-doneCh:
if it == c.inputIter {
// Enable timeout when we're done revalidating the input nodes.
@ -89,6 +117,10 @@ loop:
}
case <-timeoutCh:
break loop
case <-statusTicker.C:
log.Info("Crawling in progress",
"added", added, "updated", updated, "removed", removed,
"ignored(recent)", recent, "ignored(incompatible)", skipped)
}
}
@ -113,22 +145,25 @@ func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
}
}
func (c *crawler) updateNode(n *enode.Node) {
// updateNode updates the info about the given node, and returns a status
// about what changed
func (c *crawler) updateNode(n *enode.Node) int {
node, ok := c.output[n.ID()]
// Skip validation of recently-seen nodes.
if ok && time.Since(node.LastCheck) < c.revalidateInterval {
return
return nodeSkipRecent
}
// Request the node record.
nn, err := c.disc.RequestENR(n)
node.LastCheck = truncNow()
status := nodeUpdated
if err != nil {
if node.Score == 0 {
// Node doesn't implement EIP-868.
log.Debug("Skipping node", "id", n.ID())
return
return nodeSkipIncompat
}
node.Score /= 2
} else {
@ -137,18 +172,20 @@ func (c *crawler) updateNode(n *enode.Node) {
node.Score++
if node.FirstResponse.IsZero() {
node.FirstResponse = node.LastCheck
status = nodeAdded
}
node.LastResponse = node.LastCheck
}
// Store/update node in output set.
if node.Score <= 0 {
log.Info("Removing node", "id", n.ID())
log.Debug("Removing node", "id", n.ID())
delete(c.output, n.ID())
} else {
log.Info("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
c.output[n.ID()] = node
return nodeRemoved
}
log.Debug("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
c.output[n.ID()] = node
return status
}
func truncNow() time.Time {