Patch for concurrent iterator & others (onto v1.11.6) #386
@ -61,6 +61,11 @@ const (
|
||||
// For non-archive nodes, this limit _will_ be overblown, as disk-backed tries
|
||||
// will only be found every ~15K blocks or so.
|
||||
defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024)
|
||||
|
||||
// maximumPendingTraceStates is the maximum number of states allowed waiting
|
||||
// for tracing. The creation of trace state will be paused if the unused
|
||||
// trace states exceed this limit.
|
||||
maximumPendingTraceStates = 128
|
||||
)
|
||||
|
||||
// StateReleaseFunc is used to deallocate resources held by constructing a
|
||||
@ -251,30 +256,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// releaser is a helper tool responsible for caching the release
|
||||
// callbacks of tracing state.
|
||||
type releaser struct {
|
||||
releases []StateReleaseFunc
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (r *releaser) add(release StateReleaseFunc) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
r.releases = append(r.releases, release)
|
||||
}
|
||||
|
||||
func (r *releaser) call() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
for _, release := range r.releases {
|
||||
release()
|
||||
}
|
||||
r.releases = r.releases[:0]
|
||||
}
|
||||
|
||||
// traceChain configures a new tracer according to the provided configuration, and
|
||||
// executes all the transactions contained within. The tracing chain range includes
|
||||
// the end block but excludes the start one. The return value will be one item per
|
||||
@ -291,11 +272,11 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
threads = blocks
|
||||
}
|
||||
var (
|
||||
pend = new(sync.WaitGroup)
|
||||
ctx = context.Background()
|
||||
taskCh = make(chan *blockTraceTask, threads)
|
||||
resCh = make(chan *blockTraceTask, threads)
|
||||
reler = new(releaser)
|
||||
pend = new(sync.WaitGroup)
|
||||
ctx = context.Background()
|
||||
taskCh = make(chan *blockTraceTask, threads)
|
||||
resCh = make(chan *blockTraceTask, threads)
|
||||
tracker = newStateTracker(maximumPendingTraceStates, start.NumberU64())
|
||||
)
|
||||
for th := 0; th < threads; th++ {
|
||||
pend.Add(1)
|
||||
@ -326,8 +307,10 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number()))
|
||||
task.results[i] = &txTraceResult{Result: res}
|
||||
}
|
||||
// Tracing state is used up, queue it for de-referencing
|
||||
reler.add(task.release)
|
||||
// Tracing state is used up, queue it for de-referencing. Note the
|
||||
// state is the parent state of trace block, use block.number-1 as
|
||||
// the state number.
|
||||
tracker.releaseState(task.block.NumberU64()-1, task.release)
|
||||
|
||||
// Stream the result back to the result catcher or abort on teardown
|
||||
select {
|
||||
@ -354,8 +337,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
close(taskCh)
|
||||
pend.Wait()
|
||||
|
||||
// Clean out any pending derefs.
|
||||
reler.call()
|
||||
// Clean out any pending release functions of trace states.
|
||||
tracker.callReleases()
|
||||
|
||||
// Log the chain result
|
||||
switch {
|
||||
@ -392,6 +375,13 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
failed = err
|
||||
break
|
||||
}
|
||||
// Make sure the state creator doesn't go too far. Too many unprocessed
|
||||
// trace state may cause the oldest state to become stale(e.g. in
|
||||
// path-based scheme).
|
||||
if err = tracker.wait(number); err != nil {
|
||||
failed = err
|
||||
break
|
||||
}
|
||||
// Prepare the statedb for tracing. Don't use the live database for
|
||||
// tracing to avoid persisting state junks into the database. Switch
|
||||
// over to `preferDisk` mode only if the memory usage exceeds the
|
||||
@ -407,18 +397,18 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
|
||||
failed = err
|
||||
break
|
||||
}
|
||||
// Clean out any pending derefs. Note this step must be done after
|
||||
// constructing tracing state, because the tracing state of block
|
||||
// next depends on the parent state and construction may fail if
|
||||
// we release too early.
|
||||
reler.call()
|
||||
// Clean out any pending release functions of trace state. Note this
|
||||
// step must be done after constructing tracing state, because the
|
||||
// tracing state of block next depends on the parent state and construction
|
||||
// may fail if we release too early.
|
||||
tracker.callReleases()
|
||||
|
||||
// Send the block over to the concurrent tracers (if not in the fast-forward phase)
|
||||
txs := next.Transactions()
|
||||
select {
|
||||
case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}:
|
||||
case <-closed:
|
||||
reler.add(release)
|
||||
tracker.releaseState(number, release)
|
||||
return
|
||||
}
|
||||
traced += uint64(len(txs))
|
||||
|
109
eth/tracers/tracker.go
Normal file
109
eth/tracers/tracker.go
Normal file
@ -0,0 +1,109 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package tracers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// stateTracker is an auxiliary tool used to cache the release functions of all
|
||||
// used trace states, and to determine whether the creation of trace state needs
|
||||
// to be paused in case there are too many states waiting for tracing.
|
||||
type stateTracker struct {
|
||||
limit int // Maximum number of states allowed waiting for tracing
|
||||
oldest uint64 // The number of the oldest state which is still using for trace
|
||||
used []bool // List of flags indicating whether the trace state has been used up
|
||||
releases []StateReleaseFunc // List of trace state release functions waiting to be called
|
||||
cond *sync.Cond
|
||||
lock *sync.RWMutex
|
||||
}
|
||||
|
||||
// newStateTracker initializes the tracker with provided state limits and
|
||||
// the number of the first state that will be used.
|
||||
func newStateTracker(limit int, oldest uint64) *stateTracker {
|
||||
lock := new(sync.RWMutex)
|
||||
return &stateTracker{
|
||||
limit: limit,
|
||||
oldest: oldest,
|
||||
used: make([]bool, limit),
|
||||
cond: sync.NewCond(lock),
|
||||
lock: lock,
|
||||
}
|
||||
}
|
||||
|
||||
// releaseState marks the state specified by the number as released and caches
|
||||
// the corresponding release functions internally.
|
||||
func (t *stateTracker) releaseState(number uint64, release StateReleaseFunc) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
// Set the state as used, the corresponding flag is indexed by
|
||||
// the distance between the specified state and the oldest state
|
||||
// which is still using for trace.
|
||||
t.used[int(number-t.oldest)] = true
|
||||
|
||||
// If the oldest state is used up, update the oldest marker by moving
|
||||
// it to the next state which is not used up.
|
||||
if number == t.oldest {
|
||||
var count int
|
||||
for _, used := range t.used {
|
||||
if !used {
|
||||
break
|
||||
}
|
||||
count += 1
|
||||
}
|
||||
t.oldest += uint64(count)
|
||||
copy(t.used, t.used[count:])
|
||||
|
||||
// Clean up the array tail since they are useless now.
|
||||
for i := t.limit - count; i < t.limit; i++ {
|
||||
t.used[i] = false
|
||||
}
|
||||
// Fire the signal to all waiters that oldest marker is updated.
|
||||
t.cond.Broadcast()
|
||||
}
|
||||
t.releases = append(t.releases, release)
|
||||
}
|
||||
|
||||
// callReleases invokes all cached release functions.
|
||||
func (t *stateTracker) callReleases() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
for _, release := range t.releases {
|
||||
release()
|
||||
}
|
||||
t.releases = t.releases[:0]
|
||||
}
|
||||
|
||||
// wait blocks until the accumulated trace states are less than the limit.
|
||||
func (t *stateTracker) wait(number uint64) error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
for {
|
||||
if number < t.oldest {
|
||||
return fmt.Errorf("invalid state number %d head %d", number, t.oldest)
|
||||
}
|
||||
if number < t.oldest+uint64(t.limit) {
|
||||
// number is now within limit, wait over
|
||||
return nil
|
||||
}
|
||||
t.cond.Wait()
|
||||
}
|
||||
}
|
171
eth/tracers/tracker_test.go
Normal file
171
eth/tracers/tracker_test.go
Normal file
@ -0,0 +1,171 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package tracers
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTracker(t *testing.T) {
|
||||
var cases = []struct {
|
||||
limit int
|
||||
calls []uint64
|
||||
expHead uint64
|
||||
}{
|
||||
// Release in order
|
||||
{
|
||||
limit: 3,
|
||||
calls: []uint64{0, 1, 2},
|
||||
expHead: 3,
|
||||
},
|
||||
{
|
||||
limit: 3,
|
||||
calls: []uint64{0, 1, 2, 3, 4, 5},
|
||||
expHead: 6,
|
||||
},
|
||||
|
||||
// Release out of order
|
||||
{
|
||||
limit: 3,
|
||||
calls: []uint64{1, 2, 0},
|
||||
expHead: 3,
|
||||
},
|
||||
{
|
||||
limit: 3,
|
||||
calls: []uint64{1, 2, 0, 5, 4, 3},
|
||||
expHead: 6,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
tracker := newStateTracker(c.limit, 0)
|
||||
for _, call := range c.calls {
|
||||
tracker.releaseState(call, func() {})
|
||||
}
|
||||
tracker.lock.RLock()
|
||||
head := tracker.oldest
|
||||
tracker.lock.RUnlock()
|
||||
|
||||
if head != c.expHead {
|
||||
t.Fatalf("Unexpected head want %d got %d", c.expHead, head)
|
||||
}
|
||||
}
|
||||
|
||||
var calls = []struct {
|
||||
number uint64
|
||||
expUsed []bool
|
||||
expHead uint64
|
||||
}{
|
||||
// Release the first one, update the oldest flag
|
||||
{
|
||||
number: 0,
|
||||
expUsed: []bool{false, false, false, false, false},
|
||||
expHead: 1,
|
||||
},
|
||||
// Release the second one, oldest shouldn't be updated
|
||||
{
|
||||
number: 2,
|
||||
expUsed: []bool{false, true, false, false, false},
|
||||
expHead: 1,
|
||||
},
|
||||
// Release the forth one, oldest shouldn't be updated
|
||||
{
|
||||
number: 4,
|
||||
expUsed: []bool{false, true, false, true, false},
|
||||
expHead: 1,
|
||||
},
|
||||
// Release the first one, the first two should all be cleaned,
|
||||
// and the remaining flags should all be left-shifted.
|
||||
{
|
||||
number: 1,
|
||||
expUsed: []bool{false, true, false, false, false},
|
||||
expHead: 3,
|
||||
},
|
||||
// Release the first one, the first two should all be cleaned
|
||||
{
|
||||
number: 3,
|
||||
expUsed: []bool{false, false, false, false, false},
|
||||
expHead: 5,
|
||||
},
|
||||
}
|
||||
tracker := newStateTracker(5, 0) // limit = 5, oldest = 0
|
||||
for _, call := range calls {
|
||||
tracker.releaseState(call.number, nil)
|
||||
tracker.lock.RLock()
|
||||
if !reflect.DeepEqual(tracker.used, call.expUsed) {
|
||||
t.Fatalf("Unexpected used array")
|
||||
}
|
||||
if tracker.oldest != call.expHead {
|
||||
t.Fatalf("Unexpected head")
|
||||
}
|
||||
tracker.lock.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrackerWait(t *testing.T) {
|
||||
var (
|
||||
tracker = newStateTracker(5, 0) // limit = 5, oldest = 0
|
||||
result = make(chan error, 1)
|
||||
doCall = func(number uint64) {
|
||||
go func() {
|
||||
result <- tracker.wait(number)
|
||||
}()
|
||||
}
|
||||
checkNoWait = func() {
|
||||
select {
|
||||
case <-result:
|
||||
return
|
||||
case <-time.NewTimer(time.Second).C:
|
||||
t.Fatal("No signal fired")
|
||||
}
|
||||
}
|
||||
checkWait = func() {
|
||||
select {
|
||||
case <-result:
|
||||
t.Fatal("Unexpected signal")
|
||||
case <-time.NewTimer(time.Millisecond * 100).C:
|
||||
}
|
||||
}
|
||||
)
|
||||
// States [0, 5) should all be available
|
||||
doCall(0)
|
||||
checkNoWait()
|
||||
|
||||
doCall(4)
|
||||
checkNoWait()
|
||||
|
||||
// State 5 is not available
|
||||
doCall(5)
|
||||
checkWait()
|
||||
|
||||
// States [1, 6) are available
|
||||
tracker.releaseState(0, nil)
|
||||
checkNoWait()
|
||||
|
||||
// States [1, 6) are available
|
||||
doCall(7)
|
||||
checkWait()
|
||||
|
||||
// States [2, 7) are available
|
||||
tracker.releaseState(1, nil)
|
||||
checkWait()
|
||||
|
||||
// States [3, 8) are available
|
||||
tracker.releaseState(2, nil)
|
||||
checkNoWait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user