diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 1e04bea41..c0d9dfb06 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -61,6 +61,11 @@ const ( // For non-archive nodes, this limit _will_ be overblown, as disk-backed tries // will only be found every ~15K blocks or so. defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024) + + // maximumPendingTraceStates is the maximum number of states allowed waiting + // for tracing. The creation of trace state will be paused if the unused + // trace states exceed this limit. + maximumPendingTraceStates = 128 ) // StateReleaseFunc is used to deallocate resources held by constructing a @@ -251,30 +256,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf return sub, nil } -// releaser is a helper tool responsible for caching the release -// callbacks of tracing state. -type releaser struct { - releases []StateReleaseFunc - lock sync.Mutex -} - -func (r *releaser) add(release StateReleaseFunc) { - r.lock.Lock() - defer r.lock.Unlock() - - r.releases = append(r.releases, release) -} - -func (r *releaser) call() { - r.lock.Lock() - defer r.lock.Unlock() - - for _, release := range r.releases { - release() - } - r.releases = r.releases[:0] -} - // traceChain configures a new tracer according to the provided configuration, and // executes all the transactions contained within. The tracing chain range includes // the end block but excludes the start one. The return value will be one item per @@ -291,11 +272,11 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed threads = blocks } var ( - pend = new(sync.WaitGroup) - ctx = context.Background() - taskCh = make(chan *blockTraceTask, threads) - resCh = make(chan *blockTraceTask, threads) - reler = new(releaser) + pend = new(sync.WaitGroup) + ctx = context.Background() + taskCh = make(chan *blockTraceTask, threads) + resCh = make(chan *blockTraceTask, threads) + tracker = newStateTracker(maximumPendingTraceStates, start.NumberU64()) ) for th := 0; th < threads; th++ { pend.Add(1) @@ -326,8 +307,10 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number())) task.results[i] = &txTraceResult{Result: res} } - // Tracing state is used up, queue it for de-referencing - reler.add(task.release) + // Tracing state is used up, queue it for de-referencing. Note the + // state is the parent state of trace block, use block.number-1 as + // the state number. + tracker.releaseState(task.block.NumberU64()-1, task.release) // Stream the result back to the result catcher or abort on teardown select { @@ -354,8 +337,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed close(taskCh) pend.Wait() - // Clean out any pending derefs. - reler.call() + // Clean out any pending release functions of trace states. + tracker.callReleases() // Log the chain result switch { @@ -392,6 +375,13 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed failed = err break } + // Make sure the state creator doesn't go too far. Too many unprocessed + // trace state may cause the oldest state to become stale(e.g. in + // path-based scheme). + if err = tracker.wait(number); err != nil { + failed = err + break + } // Prepare the statedb for tracing. Don't use the live database for // tracing to avoid persisting state junks into the database. Switch // over to `preferDisk` mode only if the memory usage exceeds the @@ -407,18 +397,18 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed failed = err break } - // Clean out any pending derefs. Note this step must be done after - // constructing tracing state, because the tracing state of block - // next depends on the parent state and construction may fail if - // we release too early. - reler.call() + // Clean out any pending release functions of trace state. Note this + // step must be done after constructing tracing state, because the + // tracing state of block next depends on the parent state and construction + // may fail if we release too early. + tracker.callReleases() // Send the block over to the concurrent tracers (if not in the fast-forward phase) txs := next.Transactions() select { case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}: case <-closed: - reler.add(release) + tracker.releaseState(number, release) return } traced += uint64(len(txs)) diff --git a/eth/tracers/tracker.go b/eth/tracers/tracker.go new file mode 100644 index 000000000..136be37f5 --- /dev/null +++ b/eth/tracers/tracker.go @@ -0,0 +1,109 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package tracers + +import ( + "fmt" + "sync" +) + +// stateTracker is an auxiliary tool used to cache the release functions of all +// used trace states, and to determine whether the creation of trace state needs +// to be paused in case there are too many states waiting for tracing. +type stateTracker struct { + limit int // Maximum number of states allowed waiting for tracing + oldest uint64 // The number of the oldest state which is still using for trace + used []bool // List of flags indicating whether the trace state has been used up + releases []StateReleaseFunc // List of trace state release functions waiting to be called + cond *sync.Cond + lock *sync.RWMutex +} + +// newStateTracker initializes the tracker with provided state limits and +// the number of the first state that will be used. +func newStateTracker(limit int, oldest uint64) *stateTracker { + lock := new(sync.RWMutex) + return &stateTracker{ + limit: limit, + oldest: oldest, + used: make([]bool, limit), + cond: sync.NewCond(lock), + lock: lock, + } +} + +// releaseState marks the state specified by the number as released and caches +// the corresponding release functions internally. +func (t *stateTracker) releaseState(number uint64, release StateReleaseFunc) { + t.lock.Lock() + defer t.lock.Unlock() + + // Set the state as used, the corresponding flag is indexed by + // the distance between the specified state and the oldest state + // which is still using for trace. + t.used[int(number-t.oldest)] = true + + // If the oldest state is used up, update the oldest marker by moving + // it to the next state which is not used up. + if number == t.oldest { + var count int + for _, used := range t.used { + if !used { + break + } + count += 1 + } + t.oldest += uint64(count) + copy(t.used, t.used[count:]) + + // Clean up the array tail since they are useless now. + for i := t.limit - count; i < t.limit; i++ { + t.used[i] = false + } + // Fire the signal to all waiters that oldest marker is updated. + t.cond.Broadcast() + } + t.releases = append(t.releases, release) +} + +// callReleases invokes all cached release functions. +func (t *stateTracker) callReleases() { + t.lock.Lock() + defer t.lock.Unlock() + + for _, release := range t.releases { + release() + } + t.releases = t.releases[:0] +} + +// wait blocks until the accumulated trace states are less than the limit. +func (t *stateTracker) wait(number uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + for { + if number < t.oldest { + return fmt.Errorf("invalid state number %d head %d", number, t.oldest) + } + if number < t.oldest+uint64(t.limit) { + // number is now within limit, wait over + return nil + } + t.cond.Wait() + } +} diff --git a/eth/tracers/tracker_test.go b/eth/tracers/tracker_test.go new file mode 100644 index 000000000..46f6ac8e5 --- /dev/null +++ b/eth/tracers/tracker_test.go @@ -0,0 +1,171 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package tracers + +import ( + "reflect" + "testing" + "time" +) + +func TestTracker(t *testing.T) { + var cases = []struct { + limit int + calls []uint64 + expHead uint64 + }{ + // Release in order + { + limit: 3, + calls: []uint64{0, 1, 2}, + expHead: 3, + }, + { + limit: 3, + calls: []uint64{0, 1, 2, 3, 4, 5}, + expHead: 6, + }, + + // Release out of order + { + limit: 3, + calls: []uint64{1, 2, 0}, + expHead: 3, + }, + { + limit: 3, + calls: []uint64{1, 2, 0, 5, 4, 3}, + expHead: 6, + }, + } + for _, c := range cases { + tracker := newStateTracker(c.limit, 0) + for _, call := range c.calls { + tracker.releaseState(call, func() {}) + } + tracker.lock.RLock() + head := tracker.oldest + tracker.lock.RUnlock() + + if head != c.expHead { + t.Fatalf("Unexpected head want %d got %d", c.expHead, head) + } + } + + var calls = []struct { + number uint64 + expUsed []bool + expHead uint64 + }{ + // Release the first one, update the oldest flag + { + number: 0, + expUsed: []bool{false, false, false, false, false}, + expHead: 1, + }, + // Release the second one, oldest shouldn't be updated + { + number: 2, + expUsed: []bool{false, true, false, false, false}, + expHead: 1, + }, + // Release the forth one, oldest shouldn't be updated + { + number: 4, + expUsed: []bool{false, true, false, true, false}, + expHead: 1, + }, + // Release the first one, the first two should all be cleaned, + // and the remaining flags should all be left-shifted. + { + number: 1, + expUsed: []bool{false, true, false, false, false}, + expHead: 3, + }, + // Release the first one, the first two should all be cleaned + { + number: 3, + expUsed: []bool{false, false, false, false, false}, + expHead: 5, + }, + } + tracker := newStateTracker(5, 0) // limit = 5, oldest = 0 + for _, call := range calls { + tracker.releaseState(call.number, nil) + tracker.lock.RLock() + if !reflect.DeepEqual(tracker.used, call.expUsed) { + t.Fatalf("Unexpected used array") + } + if tracker.oldest != call.expHead { + t.Fatalf("Unexpected head") + } + tracker.lock.RUnlock() + } +} + +func TestTrackerWait(t *testing.T) { + var ( + tracker = newStateTracker(5, 0) // limit = 5, oldest = 0 + result = make(chan error, 1) + doCall = func(number uint64) { + go func() { + result <- tracker.wait(number) + }() + } + checkNoWait = func() { + select { + case <-result: + return + case <-time.NewTimer(time.Second).C: + t.Fatal("No signal fired") + } + } + checkWait = func() { + select { + case <-result: + t.Fatal("Unexpected signal") + case <-time.NewTimer(time.Millisecond * 100).C: + } + } + ) + // States [0, 5) should all be available + doCall(0) + checkNoWait() + + doCall(4) + checkNoWait() + + // State 5 is not available + doCall(5) + checkWait() + + // States [1, 6) are available + tracker.releaseState(0, nil) + checkNoWait() + + // States [1, 6) are available + doCall(7) + checkWait() + + // States [2, 7) are available + tracker.releaseState(1, nil) + checkWait() + + // States [3, 8) are available + tracker.releaseState(2, nil) + checkNoWait() +}