Patch for concurrent iterator & others (onto v1.11.6) #386
57
trie/sync.go
57
trie/sync.go
@ -19,6 +19,7 @@ package trie
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/prque"
|
||||
@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) {
|
||||
// retrieval scheduling.
|
||||
func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
// Gather all the children of the node, irrelevant whether known or not
|
||||
type child struct {
|
||||
type childNode struct {
|
||||
path []byte
|
||||
node node
|
||||
}
|
||||
var children []child
|
||||
var children []childNode
|
||||
|
||||
switch node := (object).(type) {
|
||||
case *shortNode:
|
||||
@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
if hasTerm(key) {
|
||||
key = key[:len(key)-1]
|
||||
}
|
||||
children = []child{{
|
||||
children = []childNode{{
|
||||
node: node.Val,
|
||||
path: append(append([]byte(nil), req.path...), key...),
|
||||
}}
|
||||
case *fullNode:
|
||||
for i := 0; i < 17; i++ {
|
||||
if node.Children[i] != nil {
|
||||
children = append(children, child{
|
||||
children = append(children, childNode{
|
||||
node: node.Children[i],
|
||||
path: append(append([]byte(nil), req.path...), byte(i)),
|
||||
})
|
||||
@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
panic(fmt.Sprintf("unknown node: %+v", node))
|
||||
}
|
||||
// Iterate over the children, and request all unknown ones
|
||||
requests := make([]*nodeRequest, 0, len(children))
|
||||
var (
|
||||
missing = make(chan *nodeRequest, len(children))
|
||||
pending sync.WaitGroup
|
||||
)
|
||||
for _, child := range children {
|
||||
// Notify any external watcher of a new key/value node
|
||||
if req.callback != nil {
|
||||
@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
if s.membatch.hasNode(child.path) {
|
||||
continue
|
||||
}
|
||||
// If database says duplicate, then at least the trie node is present
|
||||
// and we hold the assumption that it's NOT legacy contract code.
|
||||
chash := common.BytesToHash(node)
|
||||
if rawdb.HasTrieNode(s.database, chash) {
|
||||
continue
|
||||
}
|
||||
// Locally unknown node, schedule for retrieval
|
||||
requests = append(requests, &nodeRequest{
|
||||
path: child.path,
|
||||
hash: chash,
|
||||
parent: req,
|
||||
callback: req.callback,
|
||||
})
|
||||
// Check the presence of children concurrently
|
||||
pending.Add(1)
|
||||
go func(child childNode) {
|
||||
defer pending.Done()
|
||||
|
||||
// If database says duplicate, then at least the trie node is present
|
||||
// and we hold the assumption that it's NOT legacy contract code.
|
||||
chash := common.BytesToHash(node)
|
||||
if rawdb.HasTrieNode(s.database, chash) {
|
||||
return
|
||||
}
|
||||
// Locally unknown node, schedule for retrieval
|
||||
missing <- &nodeRequest{
|
||||
path: child.path,
|
||||
hash: chash,
|
||||
parent: req,
|
||||
callback: req.callback,
|
||||
}
|
||||
}(child)
|
||||
}
|
||||
}
|
||||
pending.Wait()
|
||||
|
||||
requests := make([]*nodeRequest, 0, len(children))
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case miss := <-missing:
|
||||
requests = append(requests, miss)
|
||||
default:
|
||||
done = true
|
||||
}
|
||||
}
|
||||
return requests, nil
|
||||
|
Loading…
Reference in New Issue
Block a user