Merge pull request #60 from openrelayxyz/feature/merge-v1.10.26

Feature/merge v1.10.26
This commit is contained in:
AusIV 2022-11-03 11:39:32 -05:00 committed by GitHub
commit 1805f477ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 266 additions and 70 deletions

View File

@ -34,7 +34,7 @@ type Filter struct {
addresses []common.Address addresses []common.Address
topics [][]common.Hash topics [][]common.Hash
block common.Hash // Block hash if filtering a single block block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks begin, end int64 // Range interval if filtering multiple blocks
matcher *bloombits.Matcher matcher *bloombits.Matcher
@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter // Create a generic filter and convert it into a block filter
filter := newFilter(sys, addresses, topics) filter := newFilter(sys, addresses, topics)
filter.block = block filter.block = &block
return filter return filter
} }
@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
// first block that contains matches, updating the start of the filter accordingly. // first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return // If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) { if f.block != nil {
header, err := f.sys.backend.HeaderByHash(ctx, f.block) header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) { if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation // Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response) resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {
req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err return err
} }
// Start a timer to disconnect if the peer doesn't reply in time // Start a timer to disconnect if the peer doesn't reply in time
go func() { go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()
timeout := time.NewTimer(syncChallengeTimeout) timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop() defer timeout.Stop()
@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit peer required block hashes, request them // If we have any explicit peer required block hashes, request them
for number, hash := range h.requiredBlocks { for number, hash := range h.requiredBlocks {
resCh := make(chan *eth.Response) resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {
req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err return err
} }
go func(number uint64, hash common.Hash) { go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()
timeout := time.NewTimer(syncChallengeTimeout) timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop() defer timeout.Stop()
@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID()) h.removePeer(peer.ID())
} }
}(number, hash) }(number, hash, req)
} }
// Handle incoming messages until the connection is torn down // Handle incoming messages until the connection is torn down
return handler(peer) return handler(peer)

View File

@ -21,10 +21,12 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
gomath "math"
"math/big" "math/big"
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -78,6 +80,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and // and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth. // waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512 maxTrieRequestCount = maxRequestSize / 512
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33
// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
) )
var ( var (
@ -431,6 +456,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed trienodeHealDups uint64 // Number of state trie nodes already processed
@ -478,6 +508,7 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealReqs: make(map[uint64]*trienodeHealRequest), trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(), stateWriter: db.NewBatch(),
extProgress: new(SyncProgress), extProgress: new(SyncProgress),
@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount { if cap > maxTrieRequestCount {
cap = maxTrieRequestCount cap = maxTrieRequestCount
} }
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var ( var (
hashes = make([]common.Hash, 0, cap) hashes = make([]common.Hash, 0, cap)
paths = make([]string, 0, cap) paths = make([]string, 0, cap)
@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response // processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks. // into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes { for i, hash := range res.hashes {
node := res.nodes[i] node := res.nodes[i]
@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[res.paths[i]] = res.hashes[i] res.task.trieTasks[res.paths[i]] = res.hashes[i]
continue continue
} }
fills++
// Push the trie node into the state syncer // Push the trie node into the state syncer
s.trienodeHealSynced++ s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node)) s.trienodeHealBytes += common.StorageSize(len(node))
@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err) log.Crit("Failed to persist healing data", "err", err)
} }
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate
pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()
log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
} }
// processBytecodeHealResponse integrates an already validated bytecode response // processBytecodeHealResponse integrates an already validated bytecode response
@ -2248,7 +2333,9 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{} s.accountIdlers[peer.ID()] = struct{}{}
} }
@ -2256,6 +2343,8 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.accountReqs[id] req, ok := s.accountReqs[id]
if !ok { if !ok {
@ -2360,7 +2449,9 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{} s.bytecodeIdlers[peer.ID()] = struct{}{}
} }
@ -2368,6 +2459,8 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.bytecodeReqs[id] req, ok := s.bytecodeReqs[id]
if !ok { if !ok {
@ -2469,7 +2562,9 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{} s.storageIdlers[peer.ID()] = struct{}{}
} }
@ -2477,6 +2572,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.storageReqs[id] req, ok := s.storageReqs[id]
if !ok { if !ok {
@ -2596,7 +2693,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{} s.trienodeHealIdlers[peer.ID()] = struct{}{}
} }
@ -2604,6 +2703,8 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.trienodeHealReqs[id] req, ok := s.trienodeHealReqs[id]
if !ok { if !ok {
@ -2639,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Cross reference the requested trienodes with the response to find gaps // Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing // that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState) var (
hash := make([]byte, 32) hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes := make([][]byte, len(req.hashes)) nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ { for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils // Find the next hash that we've been served, leaving misses with nils
hasher.Reset() hasher.Reset()
@ -2654,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
} }
if j < len(req.hashes) { if j < len(req.hashes) {
nodes[j] = trienodes[i] nodes[j] = trienodes[i]
fills++
j++ j++
continue continue
} }
// We've either ran out of hashes, or got unrequested data // We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i) logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req) s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode") return errors.New("unexpected healing trienode")
} }
// Response validated, send it to the scheduler for filling // Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{ response := &trienodeHealResponse{
paths: req.paths, paths: req.paths,
task: req.task, task: req.task,
@ -2691,7 +2800,9 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{} s.bytecodeHealIdlers[peer.ID()] = struct{}{}
} }
@ -2699,6 +2810,8 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.bytecodeHealReqs[id] req, ok := s.bytecodeHealReqs[id]
if !ok { if !ok {

View File

@ -23,7 +23,7 @@ import (
const ( const (
VersionMajor = 1 // Major version component of the current release VersionMajor = 1 // Major version component of the current release
VersionMinor = 10 // Minor version component of the current release VersionMinor = 10 // Minor version component of the current release
VersionPatch = 25 // Patch version component of the current release VersionPatch = 26 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string VersionMeta = "stable" // Version metadata to append to the version string
) )

View File

@ -31,6 +31,7 @@ import (
) )
var ( var (
ErrBadResult = errors.New("bad result in JSON-RPC response")
ErrClientQuit = errors.New("client is closed") ErrClientQuit = errors.New("client is closed")
ErrNoResult = errors.New("no result in JSON-RPC response") ErrNoResult = errors.New("no result in JSON-RPC response")
ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")

View File

@ -19,6 +19,7 @@ package rpc
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) {
} }
} }
func TestClientBatchRequest_len(t *testing.T) {
b, err := json.Marshal([]jsonrpcMessage{
{Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)},
{Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)},
})
if err != nil {
t.Fatal("failed to encode jsonrpc message:", err)
}
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
_, err := rw.Write(b)
if err != nil {
t.Error("failed to write response:", err)
}
}))
t.Cleanup(s.Close)
client, err := Dial(s.URL)
if err != nil {
t.Fatal("failed to dial test server:", err)
}
defer client.Close()
t.Run("too-few", func(t *testing.T) {
batch := []BatchElem{
{Method: "foo"},
{Method: "bar"},
{Method: "baz"},
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
defer cancelFn()
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
t.Errorf("expected %q but got: %v", ErrBadResult, err)
}
})
t.Run("too-many", func(t *testing.T) {
batch := []BatchElem{
{Method: "foo"},
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
defer cancelFn()
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
t.Errorf("expected %q but got: %v", ErrBadResult, err)
}
})
}
func TestClientNotify(t *testing.T) { func TestClientNotify(t *testing.T) {
server := newTestServer() server := newTestServer()
defer server.Stop() defer server.Stop()

View File

@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err return err
} }
if len(respmsgs) != len(msgs) {
return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult)
}
for i := 0; i < len(respmsgs); i++ { for i := 0; i < len(respmsgs); i++ {
op.resp <- &respmsgs[i] op.resp <- &respmsgs[i]
} }

View File

@ -19,6 +19,7 @@ package trie
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/common/prque"
@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) {
// retrieval scheduling. // retrieval scheduling.
func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// Gather all the children of the node, irrelevant whether known or not // Gather all the children of the node, irrelevant whether known or not
type child struct { type childNode struct {
path []byte path []byte
node node node node
} }
var children []child var children []childNode
switch node := (object).(type) { switch node := (object).(type) {
case *shortNode: case *shortNode:
@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
if hasTerm(key) { if hasTerm(key) {
key = key[:len(key)-1] key = key[:len(key)-1]
} }
children = []child{{ children = []childNode{{
node: node.Val, node: node.Val,
path: append(append([]byte(nil), req.path...), key...), path: append(append([]byte(nil), req.path...), key...),
}} }}
case *fullNode: case *fullNode:
for i := 0; i < 17; i++ { for i := 0; i < 17; i++ {
if node.Children[i] != nil { if node.Children[i] != nil {
children = append(children, child{ children = append(children, childNode{
node: node.Children[i], node: node.Children[i],
path: append(append([]byte(nil), req.path...), byte(i)), path: append(append([]byte(nil), req.path...), byte(i)),
}) })
@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
panic(fmt.Sprintf("unknown node: %+v", node)) panic(fmt.Sprintf("unknown node: %+v", node))
} }
// Iterate over the children, and request all unknown ones // Iterate over the children, and request all unknown ones
requests := make([]*nodeRequest, 0, len(children)) var (
missing = make(chan *nodeRequest, len(children))
pending sync.WaitGroup
)
for _, child := range children { for _, child := range children {
// Notify any external watcher of a new key/value node // Notify any external watcher of a new key/value node
if req.callback != nil { if req.callback != nil {
@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
if s.membatch.hasNode(child.path) { if s.membatch.hasNode(child.path) {
continue continue
} }
// Check the presence of children concurrently
pending.Add(1)
go func(child childNode) {
defer pending.Done()
// If database says duplicate, then at least the trie node is present // If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code. // and we hold the assumption that it's NOT legacy contract code.
chash := common.BytesToHash(node) chash := common.BytesToHash(node)
if rawdb.HasTrieNode(s.database, chash) { if rawdb.HasTrieNode(s.database, chash) {
continue return
} }
// Locally unknown node, schedule for retrieval // Locally unknown node, schedule for retrieval
requests = append(requests, &nodeRequest{ missing <- &nodeRequest{
path: child.path, path: child.path,
hash: chash, hash: chash,
parent: req, parent: req,
callback: req.callback, callback: req.callback,
}) }
}(child)
}
}
pending.Wait()
requests := make([]*nodeRequest, 0, len(children))
for done := false; !done; {
select {
case miss := <-missing:
requests = append(requests, miss)
default:
done = true
} }
} }
return requests, nil return requests, nil