eth, les: rework chain tracer (#25143)

This PR simplifies the logic of chain tracer and also adds the unit tests.

The most important change has been made in this PR is the state management. Whenever a tracing state is acquired there is a corresponding release function be returned as well. It must be called once the state is used up, otherwise resource leaking can happen.

And also the logic of state management has been simplified a lot. Specifically, the state provider(eth backend, les backend) should ensure the state is available and referenced. State customers can use the state according to their own needs, or build other states based on the given state. But once the release function is called, there is no guarantee of the availability of the state.


Co-authored-by: Sina Mahmoodi <1591639+s1na@users.noreply.github.com>
Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
rjl493456442 2022-09-08 02:25:58 +08:00 committed by GitHub
parent dea1fb3cfc
commit b1f6dccfba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 311 additions and 158 deletions

View File

@ -411,10 +411,12 @@ func (api *DebugAPI) StorageRangeAt(blockHash common.Hash, txIndex int, contract
if block == nil {
return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash)
}
_, _, statedb, err := api.eth.stateAtTransaction(block, txIndex, 0)
_, _, statedb, release, err := api.eth.stateAtTransaction(block, txIndex, 0)
if err != nil {
return StorageRangeResult{}, err
}
defer release()
st := statedb.StorageTrie(contractAddress)
if st == nil {
return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress)

View File

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/miner"
@ -363,10 +364,10 @@ func (b *EthAPIBackend) StartMining(threads int) error {
return b.eth.StartMining(threads)
}
func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error) {
return b.eth.StateAtBlock(block, reexec, base, checkLive, preferDisk)
func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) {
return b.eth.StateAtBlock(block, reexec, base, readOnly, preferDisk)
}
func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) {
func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
return b.eth.stateAtTransaction(block, txIndex, reexec)
}

View File

@ -26,39 +26,59 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
)
// noopReleaser is returned in case there is no operation expected
// for releasing state.
var noopReleaser = tracers.StateReleaseFunc(func() {})
// StateAtBlock retrieves the state database associated with a certain block.
// If no state is locally available for the given block, a number of blocks
// are attempted to be reexecuted to generate the desired state. The optional
// base layer statedb can be passed then it's regarded as the statedb of the
// base layer statedb can be provided which is regarded as the statedb of the
// parent block.
//
// An additional release function will be returned if the requested state is
// available. Release is expected to be invoked when the returned state is no longer needed.
// Its purpose is to prevent resource leaking. Though it can be noop in some cases.
//
// Parameters:
// - block: The block for which we want the state (== state at the stateRoot of the parent)
// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state
// - base: If the caller is tracing multiple blocks, the caller can provide the parent state
// continuously from the callsite.
// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to
// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid
// storing trash persistently
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided,
// it would be preferable to start from a fresh state, if we have it on disk.
func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
// - block: The block for which we want the state(state = block.Root)
// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state
// - base: If the caller is tracing multiple blocks, the caller can provide the parent
// state continuously from the callsite.
// - readOnly: If true, then the live 'blockchain' state database is used. No mutation should
// be made from caller, e.g. perform Commit or other 'save-to-disk' changes.
// Otherwise, the trash generated by caller may be persisted permanently.
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is
// provided, it would be preferable to start from a fresh state, if we have it
// on disk.
func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) {
var (
current *types.Block
database state.Database
report = true
origin = block.NumberU64()
)
// Check the live database first if we have the state fully available, use that.
if checkLive {
statedb, err = eth.blockchain.StateAt(block.Root())
if err == nil {
return statedb, nil
// The state is only for reading purposes, check the state presence in
// live database.
if readOnly {
// The state is available in live database, create a reference
// on top to prevent garbage collection and return a release
// function to deref it.
if statedb, err = eth.blockchain.StateAt(block.Root()); err == nil {
statedb.Database().TrieDB().Reference(block.Root(), common.Hash{})
return statedb, func() {
statedb.Database().TrieDB().Dereference(block.Root())
}, nil
}
}
// The state is both for reading and writing, or it's unavailable in disk,
// try to construct/recover the state over an ephemeral trie.Database for
// isolating the live one.
if base != nil {
if preferDisk {
// Create an ephemeral trie.Database for isolating the live one. Otherwise
@ -66,37 +86,37 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16})
if statedb, err = state.New(block.Root(), database, nil); err == nil {
log.Info("Found disk backend for state trie", "root", block.Root(), "number", block.Number())
return statedb, nil
return statedb, noopReleaser, nil
}
}
// The optional base statedb is given, mark the start point as parent block
statedb, database, report = base, base.Database(), false
current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
} else {
// Otherwise try to reexec blocks until we find a state or reach our limit
// Otherwise, try to reexec blocks until we find a state or reach our limit
current = block
// Create an ephemeral trie.Database for isolating the live one. Otherwise
// the internal junks created by tracing will be persisted into the disk.
database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16})
// If we didn't check the dirty database, do check the clean one, otherwise
// we would rewind past a persisted block (specific corner case is chain
// tracing from the genesis).
if !checkLive {
// If we didn't check the live database, do check state over ephemeral database,
// otherwise we would rewind past a persisted block (specific corner case is
// chain tracing from the genesis).
if !readOnly {
statedb, err = state.New(current.Root(), database, nil)
if err == nil {
return statedb, nil
return statedb, noopReleaser, nil
}
}
// Database does not have the state for the given block, try to regenerate
for i := uint64(0); i < reexec; i++ {
if current.NumberU64() == 0 {
return nil, errors.New("genesis state is missing")
return nil, nil, errors.New("genesis state is missing")
}
parent := eth.blockchain.GetBlock(current.ParentHash(), current.NumberU64()-1)
if parent == nil {
return nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1)
return nil, nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1)
}
current = parent
@ -108,13 +128,14 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
if err != nil {
switch err.(type) {
case *trie.MissingNodeError:
return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec)
return nil, nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec)
default:
return nil, err
return nil, nil, err
}
}
}
// State was available at historical point, regenerate
// State is available at historical point, re-execute the blocks on top for
// the desired state.
var (
start = time.Now()
logged time.Time
@ -129,22 +150,24 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
// Retrieve the next block to regenerate and process it
next := current.NumberU64() + 1
if current = eth.blockchain.GetBlockByNumber(next); current == nil {
return nil, fmt.Errorf("block #%d not found", next)
return nil, nil, fmt.Errorf("block #%d not found", next)
}
_, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{})
if err != nil {
return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err)
return nil, nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err)
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number()))
if err != nil {
return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w",
return nil, nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w",
current.NumberU64(), current.Root().Hex(), err)
}
statedb, err = state.New(root, database, nil)
if err != nil {
return nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err)
return nil, nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err)
}
// Hold the state reference and also drop the parent state
// to prevent accumulating too many nodes in memory.
database.TrieDB().Reference(root, common.Hash{})
if parent != (common.Hash{}) {
database.TrieDB().Dereference(parent)
@ -155,28 +178,28 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
nodes, imgs := database.TrieDB().Size()
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
}
return statedb, nil
return statedb, func() { database.TrieDB().Dereference(block.Root()) }, nil
}
// stateAtTransaction returns the execution environment of a certain transaction.
func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) {
func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
// Short circuit if it's genesis block.
if block.NumberU64() == 0 {
return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis")
return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis")
}
// Create the parent state database
parent := eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
return nil, vm.BlockContext{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash())
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("parent %#x not found", block.ParentHash())
}
// Lookup the statedb of parent block from the live database,
// otherwise regenerate it on the flight.
statedb, err := eth.StateAtBlock(parent, reexec, nil, true, false)
statedb, release, err := eth.StateAtBlock(parent, reexec, nil, true, false)
if err != nil {
return nil, vm.BlockContext{}, nil, err
return nil, vm.BlockContext{}, nil, nil, err
}
if txIndex == 0 && len(block.Transactions()) == 0 {
return nil, vm.BlockContext{}, statedb, nil
return nil, vm.BlockContext{}, statedb, release, nil
}
// Recompute transactions up to the target index.
signer := types.MakeSigner(eth.blockchain.Config(), block.Number())
@ -186,17 +209,17 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec
txContext := core.NewEVMTxContext(msg)
context := core.NewEVMBlockContext(block.Header(), eth.blockchain, nil)
if idx == txIndex {
return msg, context, statedb, nil
return msg, context, statedb, release, nil
}
// Not yet the searched for transaction, execute on top of the current state
vmenv := vm.NewEVM(context, txContext, statedb, eth.blockchain.Config(), vm.Config{})
statedb.Prepare(tx.Hash(), idx)
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
}
// Ensure any modifications are committed to the state
// Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
}
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
}

View File

@ -63,6 +63,10 @@ const (
defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024)
)
// StateReleaseFunc is used to deallocate resources held by constructing a
// historical state for tracing purposes.
type StateReleaseFunc func()
// Backend interface provides the common API services (that are provided by
// both full and light clients) with access to necessary functions.
type Backend interface {
@ -75,11 +79,8 @@ type Backend interface {
ChainConfig() *params.ChainConfig
Engine() consensus.Engine
ChainDb() ethdb.Database
// StateAtBlock returns the state corresponding to the stateroot of the block.
// N.B: For executing transactions on block N, the required stateRoot is block N-1,
// so this method should be called with the parent.
StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error)
StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error)
StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error)
StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error)
}
// API is the collection of tracing APIs exposed over the private debugging endpoint.
@ -201,7 +202,7 @@ type txTraceResult struct {
type blockTraceTask struct {
statedb *state.StateDB // Intermediate state prepped for tracing
block *types.Block // Block to trace the transactions from
rootref common.Hash // Trie root reference held for this task
release StateReleaseFunc // The function to release the held resource for this task
results []*txTraceResult // Trace results produced by the task
}
@ -234,13 +235,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf
if from.Number().Cmp(to.Number()) >= 0 {
return nil, fmt.Errorf("end block (#%d) needs to come after start block (#%d)", end, start)
}
return api.traceChain(ctx, from, to, config)
}
// traceChain configures a new tracer according to the provided configuration, and
// executes all the transactions contained within. The return value will be one item
// per transaction, dependent on the requested tracer.
func (api *API) traceChain(ctx context.Context, start, end *types.Block, config *TraceConfig) (*rpc.Subscription, error) {
// Tracing a chain is a **long** operation, only do with subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
@ -248,8 +242,45 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config
}
sub := notifier.CreateSubscription()
// Prepare all the states for tracing. Note this procedure can take very
// long time. Timeout mechanism is necessary.
resCh := api.traceChain(from, to, config, notifier.Closed())
go func() {
for result := range resCh {
notifier.Notify(sub.ID, result)
}
}()
return sub, nil
}
// releaser is a helper tool responsible for caching the release
// callbacks of tracing state.
type releaser struct {
releases []StateReleaseFunc
lock sync.Mutex
}
func (r *releaser) add(release StateReleaseFunc) {
r.lock.Lock()
defer r.lock.Unlock()
r.releases = append(r.releases, release)
}
func (r *releaser) call() {
r.lock.Lock()
defer r.lock.Unlock()
for _, release := range r.releases {
release()
}
r.releases = r.releases[:0]
}
// traceChain configures a new tracer according to the provided configuration, and
// executes all the transactions contained within. The tracing chain range includes
// the end block but excludes the start one. The return value will be one item per
// transaction, dependent on the requested tracer.
// The tracing procedure should be aborted in case the closed signal is received.
func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed <-chan interface{}) chan *blockTraceResult {
reexec := defaultTraceReexec
if config != nil && config.Reexec != nil {
reexec = *config.Reexec
@ -260,20 +291,23 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config
threads = blocks
}
var (
pend = new(sync.WaitGroup)
tasks = make(chan *blockTraceTask, threads)
results = make(chan *blockTraceTask, threads)
localctx = context.Background()
pend = new(sync.WaitGroup)
ctx = context.Background()
taskCh = make(chan *blockTraceTask, threads)
resCh = make(chan *blockTraceTask, threads)
reler = new(releaser)
)
for th := 0; th < threads; th++ {
pend.Add(1)
go func() {
defer pend.Done()
// Fetch and execute the next block trace tasks
for task := range tasks {
signer := types.MakeSigner(api.backend.ChainConfig(), task.block.Number())
blockCtx := core.NewEVMBlockContext(task.block.Header(), api.chainContext(localctx), nil)
// Fetch and execute the block trace taskCh
for task := range taskCh {
var (
signer = types.MakeSigner(api.backend.ChainConfig(), task.block.Number())
blockCtx = core.NewEVMBlockContext(task.block.Header(), api.chainContext(ctx), nil)
)
// Trace all the transactions contained within
for i, tx := range task.block.Transactions() {
msg, _ := tx.AsMessage(signer, task.block.BaseFee())
@ -282,7 +316,7 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config
TxIndex: i,
TxHash: tx.Hash(),
}
res, err := api.traceTx(localctx, msg, txctx, blockCtx, task.statedb, config)
res, err := api.traceTx(ctx, msg, txctx, blockCtx, task.statedb, config)
if err != nil {
task.results[i] = &txTraceResult{Error: err.Error()}
log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err)
@ -292,36 +326,38 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config
task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number()))
task.results[i] = &txTraceResult{Result: res}
}
// Stream the result back to the user or abort on teardown
// Tracing state is used up, queue it for de-referencing
reler.add(task.release)
// Stream the result back to the result catcher or abort on teardown
select {
case results <- task:
case <-notifier.Closed():
case resCh <- task:
case <-closed:
return
}
}
}()
}
// Start a goroutine to feed all the blocks into the tracers
var (
begin = time.Now()
derefTodo []common.Hash // list of hashes to dereference from the db
derefsMu sync.Mutex // mutex for the derefs
)
go func() {
var (
logged time.Time
begin = time.Now()
number uint64
traced uint64
failed error
parent common.Hash
statedb *state.StateDB
release StateReleaseFunc
)
// Ensure everything is properly cleaned up on any exit path
defer func() {
close(tasks)
close(taskCh)
pend.Wait()
// Clean out any pending derefs.
reler.call()
// Log the chain result
switch {
case failed != nil:
log.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed)
@ -330,105 +366,97 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config
default:
log.Info("Chain tracing finished", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin))
}
close(results)
close(resCh)
}()
var preferDisk bool
// Feed all the blocks both into the tracer, as well as fast process concurrently
for number = start.NumberU64(); number < end.NumberU64(); number++ {
// Stop tracing if interruption was requested
select {
case <-notifier.Closed():
case <-closed:
return
default:
}
// clean out any derefs
derefsMu.Lock()
for _, h := range derefTodo {
statedb.Database().TrieDB().Dereference(h)
}
derefTodo = derefTodo[:0]
derefsMu.Unlock()
// Print progress logs if long enough time elapsed
if time.Since(logged) > 8*time.Second {
logged = time.Now()
log.Info("Tracing chain segment", "start", start.NumberU64(), "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin))
}
// Retrieve the parent state to trace on top
block, err := api.blockByNumber(localctx, rpc.BlockNumber(number))
// Retrieve the parent block and target block for tracing.
block, err := api.blockByNumber(ctx, rpc.BlockNumber(number))
if err != nil {
failed = err
break
}
next, err := api.blockByNumber(ctx, rpc.BlockNumber(number+1))
if err != nil {
failed = err
break
}
// Prepare the statedb for tracing. Don't use the live database for
// tracing to avoid persisting state junks into the database.
statedb, err = api.backend.StateAtBlock(localctx, block, reexec, statedb, false, preferDisk)
// tracing to avoid persisting state junks into the database. Switch
// over to `preferDisk` mode only if the memory usage exceeds the
// limit, the trie database will be reconstructed from scratch only
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
failed = err
break
}
if trieDb := statedb.Database().TrieDB(); trieDb != nil {
// Hold the reference for tracer, will be released at the final stage
trieDb.Reference(block.Root(), common.Hash{})
// Clean out any pending derefs. Note this step must be done after
// constructing tracing state, because the tracing state of block
// next depends on the parent state and construction may fail if
// we release too early.
reler.call()
// Release the parent state because it's already held by the tracer
if parent != (common.Hash{}) {
trieDb.Dereference(parent)
}
// Prefer disk if the trie db memory grows too much
s1, s2 := trieDb.Size()
if !preferDisk && (s1+s2) > defaultTracechainMemLimit {
log.Info("Switching to prefer-disk mode for tracing", "size", s1+s2)
preferDisk = true
}
}
parent = block.Root()
next, err := api.blockByNumber(localctx, rpc.BlockNumber(number+1))
if err != nil {
failed = err
break
}
// Send the block over to the concurrent tracers (if not in the fast-forward phase)
txs := next.Transactions()
select {
case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, rootref: block.Root(), results: make([]*txTraceResult, len(txs))}:
case <-notifier.Closed():
case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}:
case <-closed:
reler.add(release)
return
}
traced += uint64(len(txs))
}
}()
// Keep reading the trace results and stream the to the user
// Keep reading the trace results and stream them to result channel.
retCh := make(chan *blockTraceResult)
go func() {
defer close(retCh)
var (
done = make(map[uint64]*blockTraceResult)
next = start.NumberU64() + 1
done = make(map[uint64]*blockTraceResult)
)
for res := range results {
for res := range resCh {
// Queue up next received result
result := &blockTraceResult{
Block: hexutil.Uint64(res.block.NumberU64()),
Hash: res.block.Hash(),
Traces: res.results,
}
// Schedule any parent tries held in memory by this task for dereferencing
done[uint64(result.Block)] = result
derefsMu.Lock()
derefTodo = append(derefTodo, res.rootref)
derefsMu.Unlock()
// Stream completed traces to the user, aborting on the first error
// Stream completed traces to the result channel
for result, ok := done[next]; ok; result, ok = done[next] {
if len(result.Traces) > 0 || next == end.NumberU64() {
notifier.Notify(sub.ID, result)
// It will be blocked in case the channel consumer doesn't take the
// tracing result in time(e.g. the websocket connect is not stable)
// which will eventually block the entire chain tracer. It's the
// expected behavior to not waste node resources for a non-active user.
retCh <- result
}
delete(done, next)
next++
}
}
}()
return sub, nil
return retCh
}
// TraceBlockByNumber returns the structured logs created during the execution of
@ -515,10 +543,12 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config
if config != nil && config.Reexec != nil {
reexec = *config.Reexec
}
statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
if err != nil {
return nil, err
}
defer release()
var (
roots []common.Hash
signer = types.MakeSigner(api.backend.ChainConfig(), block.Number())
@ -576,10 +606,12 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
if config != nil && config.Reexec != nil {
reexec = *config.Reexec
}
statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
if err != nil {
return nil, err
}
defer release()
// Execute all the transaction contained within the block concurrently
var (
signer = types.MakeSigner(api.backend.ChainConfig(), block.Number())
@ -666,10 +698,12 @@ func (api *API) standardTraceBlockToFile(ctx context.Context, block *types.Block
if config != nil && config.Reexec != nil {
reexec = *config.Reexec
}
statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false)
if err != nil {
return nil, err
}
defer release()
// Retrieve the tracing configurations, or use default values
var (
logConfig logger.Config
@ -793,10 +827,12 @@ func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *
if err != nil {
return nil, err
}
msg, vmctx, statedb, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec)
msg, vmctx, statedb, release, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec)
if err != nil {
return nil, err
}
defer release()
txctx := &Context{
BlockHash: blockHash,
TxIndex: int(index),
@ -837,10 +873,12 @@ func (api *API) TraceCall(ctx context.Context, args ethapi.TransactionArgs, bloc
if config != nil && config.Reexec != nil {
reexec = *config.Reexec
}
statedb, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false)
statedb, release, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false)
if err != nil {
return nil, err
}
defer release()
vmctx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil)
// Apply the customization rules if required.
if config != nil {

View File

@ -26,6 +26,7 @@ import (
"math/big"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"
@ -57,6 +58,9 @@ type testBackend struct {
engine consensus.Engine
chaindb ethdb.Database
chain *core.BlockChain
refHook func() // Hook is invoked when the requested state is referenced
relHook func() // Hook is invoked when the requested state is released
}
func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i int, b *core.BlockGen)) *testBackend {
@ -133,25 +137,33 @@ func (b *testBackend) ChainDb() ethdb.Database {
return b.chaindb
}
func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) {
func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) {
statedb, err := b.chain.StateAt(block.Root())
if err != nil {
return nil, errStateNotFound
return nil, nil, errStateNotFound
}
return statedb, nil
if b.refHook != nil {
b.refHook()
}
release := func() {
if b.relHook != nil {
b.relHook()
}
}
return statedb, release, nil
}
func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) {
func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error) {
parent := b.chain.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parent == nil {
return nil, vm.BlockContext{}, nil, errBlockNotFound
return nil, vm.BlockContext{}, nil, nil, errBlockNotFound
}
statedb, err := b.chain.StateAt(parent.Root())
statedb, release, err := b.StateAtBlock(ctx, parent, reexec, nil, true, false)
if err != nil {
return nil, vm.BlockContext{}, nil, errStateNotFound
return nil, vm.BlockContext{}, nil, nil, errStateNotFound
}
if txIndex == 0 && len(block.Transactions()) == 0 {
return nil, vm.BlockContext{}, statedb, nil
return nil, vm.BlockContext{}, statedb, release, nil
}
// Recompute transactions up to the target index.
signer := types.MakeSigner(b.chainConfig, block.Number())
@ -160,15 +172,15 @@ func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block
txContext := core.NewEVMTxContext(msg)
context := core.NewEVMBlockContext(block.Header(), b.chain, nil)
if idx == txIndex {
return msg, context, statedb, nil
return msg, context, statedb, release, nil
}
vmenv := vm.NewEVM(context, txContext, statedb, b.chainConfig, vm.Config{})
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
}
statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
}
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
}
func TestTraceCall(t *testing.T) {
@ -622,3 +634,74 @@ func newStates(keys []common.Hash, vals []common.Hash) *map[common.Hash]common.H
}
return &m
}
func TestTraceChain(t *testing.T) {
// Initialize test accounts
accounts := newAccounts(3)
genesis := &core.Genesis{Alloc: core.GenesisAlloc{
accounts[0].addr: {Balance: big.NewInt(params.Ether)},
accounts[1].addr: {Balance: big.NewInt(params.Ether)},
accounts[2].addr: {Balance: big.NewInt(params.Ether)},
}}
genBlocks := 50
signer := types.HomesteadSigner{}
var (
ref uint32 // total refs has made
rel uint32 // total rels has made
nonce uint64
)
backend := newTestBackend(t, genBlocks, genesis, func(i int, b *core.BlockGen) {
// Transfer from account[0] to account[1]
// value: 1000 wei
// fee: 0 wei
for j := 0; j < i+1; j++ {
tx, _ := types.SignTx(types.NewTransaction(nonce, accounts[1].addr, big.NewInt(1000), params.TxGas, b.BaseFee(), nil), signer, accounts[0].key)
b.AddTx(tx)
nonce += 1
}
})
backend.refHook = func() { atomic.AddUint32(&ref, 1) }
backend.relHook = func() { atomic.AddUint32(&rel, 1) }
api := NewAPI(backend)
single := `{"result":{"gas":21000,"failed":false,"returnValue":"","structLogs":[]}}`
var cases = []struct {
start uint64
end uint64
config *TraceConfig
}{
{0, 50, nil}, // the entire chain range, blocks [1, 50]
{10, 20, nil}, // the middle chain range, blocks [11, 20]
}
for _, c := range cases {
ref, rel = 0, 0 // clean up the counters
from, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.start))
to, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.end))
resCh := api.traceChain(from, to, c.config, nil)
next := c.start + 1
for result := range resCh {
if next != uint64(result.Block) {
t.Error("Unexpected tracing block")
}
if len(result.Traces) != int(next) {
t.Error("Unexpected tracing result")
}
for _, trace := range result.Traces {
blob, _ := json.Marshal(trace)
if string(blob) != single {
t.Error("Unexpected tracing result")
}
}
next += 1
}
if next != c.end+1 {
t.Error("Missing tracing block")
}
if ref != rel {
t.Errorf("Ref and deref actions are not equal, ref %d rel %d", ref, rel)
}
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/light"
@ -321,10 +322,10 @@ func (b *LesApiBackend) CurrentHeader() *types.Header {
return b.eth.blockchain.CurrentHeader()
}
func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) {
func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) {
return b.eth.stateAtBlock(ctx, block, reexec)
}
func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) {
func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
return b.eth.stateAtTransaction(ctx, block, txIndex, reexec)
}

View File

@ -25,31 +25,36 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/light"
)
// noopReleaser is returned in case there is no operation expected
// for releasing state.
var noopReleaser = tracers.StateReleaseFunc(func() {})
// stateAtBlock retrieves the state database associated with a certain block.
func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, error) {
return light.NewState(ctx, block.Header(), leth.odr), nil
func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, tracers.StateReleaseFunc, error) {
return light.NewState(ctx, block.Header(), leth.odr), noopReleaser, nil
}
// stateAtTransaction returns the execution environment of a certain transaction.
func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) {
func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
// Short circuit if it's genesis block.
if block.NumberU64() == 0 {
return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis")
return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis")
}
// Create the parent state database
parent, err := leth.blockchain.GetBlock(ctx, block.ParentHash(), block.NumberU64()-1)
if err != nil {
return nil, vm.BlockContext{}, nil, err
return nil, vm.BlockContext{}, nil, nil, err
}
statedb, err := leth.stateAtBlock(ctx, parent, reexec)
statedb, release, err := leth.stateAtBlock(ctx, parent, reexec)
if err != nil {
return nil, vm.BlockContext{}, nil, err
return nil, vm.BlockContext{}, nil, nil, err
}
if txIndex == 0 && len(block.Transactions()) == 0 {
return nil, vm.BlockContext{}, statedb, nil
return nil, vm.BlockContext{}, statedb, release, nil
}
// Recompute transactions up to the target index.
signer := types.MakeSigner(leth.blockchain.Config(), block.Number())
@ -60,16 +65,16 @@ func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.
context := core.NewEVMBlockContext(block.Header(), leth.blockchain, nil)
statedb.Prepare(tx.Hash(), idx)
if idx == txIndex {
return msg, context, statedb, nil
return msg, context, statedb, release, nil
}
// Not yet the searched for transaction, execute on top of the current state
vmenv := vm.NewEVM(context, txContext, statedb, leth.blockchain.Config(), vm.Config{})
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
}
// Ensure any modifications are committed to the state
// Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
}
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
}