From b1f6dccfbaa5712adafabd374b89d8d12e4ad3ad Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 8 Sep 2022 02:25:58 +0800 Subject: [PATCH] eth, les: rework chain tracer (#25143) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR simplifies the logic of chain tracer and also adds the unit tests. The most important change has been made in this PR is the state management. Whenever a tracing state is acquired there is a corresponding release function be returned as well. It must be called once the state is used up, otherwise resource leaking can happen. And also the logic of state management has been simplified a lot. Specifically, the state provider(eth backend, les backend) should ensure the state is available and referenced. State customers can use the state according to their own needs, or build other states based on the given state. But once the release function is called, there is no guarantee of the availability of the state. Co-authored-by: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> Co-authored-by: Péter Szilágyi --- eth/api.go | 4 +- eth/api_backend.go | 7 +- eth/state_accessor.go | 107 ++++++++++++-------- eth/tracers/api.go | 214 +++++++++++++++++++++++----------------- eth/tracers/api_test.go | 105 +++++++++++++++++--- les/api_backend.go | 5 +- les/state_accessor.go | 27 ++--- 7 files changed, 311 insertions(+), 158 deletions(-) diff --git a/eth/api.go b/eth/api.go index 5642ef4c3..3b5bb5f0a 100644 --- a/eth/api.go +++ b/eth/api.go @@ -411,10 +411,12 @@ func (api *DebugAPI) StorageRangeAt(blockHash common.Hash, txIndex int, contract if block == nil { return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash) } - _, _, statedb, err := api.eth.stateAtTransaction(block, txIndex, 0) + _, _, statedb, release, err := api.eth.stateAtTransaction(block, txIndex, 0) if err != nil { return StorageRangeResult{}, err } + defer release() + st := statedb.StorageTrie(contractAddress) if st == nil { return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress) diff --git a/eth/api_backend.go b/eth/api_backend.go index 00ecacc31..97c22c8fb 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/miner" @@ -363,10 +364,10 @@ func (b *EthAPIBackend) StartMining(threads int) error { return b.eth.StartMining(threads) } -func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error) { - return b.eth.StateAtBlock(block, reexec, base, checkLive, preferDisk) +func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { + return b.eth.StateAtBlock(block, reexec, base, readOnly, preferDisk) } -func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(block, txIndex, reexec) } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 12dba8a0a..4651ef306 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -26,39 +26,59 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie" ) +// noopReleaser is returned in case there is no operation expected +// for releasing state. +var noopReleaser = tracers.StateReleaseFunc(func() {}) + // StateAtBlock retrieves the state database associated with a certain block. // If no state is locally available for the given block, a number of blocks // are attempted to be reexecuted to generate the desired state. The optional -// base layer statedb can be passed then it's regarded as the statedb of the +// base layer statedb can be provided which is regarded as the statedb of the // parent block. +// +// An additional release function will be returned if the requested state is +// available. Release is expected to be invoked when the returned state is no longer needed. +// Its purpose is to prevent resource leaking. Though it can be noop in some cases. +// // Parameters: -// - block: The block for which we want the state (== state at the stateRoot of the parent) -// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state -// - base: If the caller is tracing multiple blocks, the caller can provide the parent state -// continuously from the callsite. -// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to -// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid -// storing trash persistently -// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, -// it would be preferable to start from a fresh state, if we have it on disk. -func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { +// - block: The block for which we want the state(state = block.Root) +// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state +// - base: If the caller is tracing multiple blocks, the caller can provide the parent +// state continuously from the callsite. +// - readOnly: If true, then the live 'blockchain' state database is used. No mutation should +// be made from caller, e.g. perform Commit or other 'save-to-disk' changes. +// Otherwise, the trash generated by caller may be persisted permanently. +// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is +// provided, it would be preferable to start from a fresh state, if we have it +// on disk. +func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) { var ( current *types.Block database state.Database report = true origin = block.NumberU64() ) - // Check the live database first if we have the state fully available, use that. - if checkLive { - statedb, err = eth.blockchain.StateAt(block.Root()) - if err == nil { - return statedb, nil + // The state is only for reading purposes, check the state presence in + // live database. + if readOnly { + // The state is available in live database, create a reference + // on top to prevent garbage collection and return a release + // function to deref it. + if statedb, err = eth.blockchain.StateAt(block.Root()); err == nil { + statedb.Database().TrieDB().Reference(block.Root(), common.Hash{}) + return statedb, func() { + statedb.Database().TrieDB().Dereference(block.Root()) + }, nil } } + // The state is both for reading and writing, or it's unavailable in disk, + // try to construct/recover the state over an ephemeral trie.Database for + // isolating the live one. if base != nil { if preferDisk { // Create an ephemeral trie.Database for isolating the live one. Otherwise @@ -66,37 +86,37 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16}) if statedb, err = state.New(block.Root(), database, nil); err == nil { log.Info("Found disk backend for state trie", "root", block.Root(), "number", block.Number()) - return statedb, nil + return statedb, noopReleaser, nil } } // The optional base statedb is given, mark the start point as parent block statedb, database, report = base, base.Database(), false current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) } else { - // Otherwise try to reexec blocks until we find a state or reach our limit + // Otherwise, try to reexec blocks until we find a state or reach our limit current = block // Create an ephemeral trie.Database for isolating the live one. Otherwise // the internal junks created by tracing will be persisted into the disk. database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16}) - // If we didn't check the dirty database, do check the clean one, otherwise - // we would rewind past a persisted block (specific corner case is chain - // tracing from the genesis). - if !checkLive { + // If we didn't check the live database, do check state over ephemeral database, + // otherwise we would rewind past a persisted block (specific corner case is + // chain tracing from the genesis). + if !readOnly { statedb, err = state.New(current.Root(), database, nil) if err == nil { - return statedb, nil + return statedb, noopReleaser, nil } } // Database does not have the state for the given block, try to regenerate for i := uint64(0); i < reexec; i++ { if current.NumberU64() == 0 { - return nil, errors.New("genesis state is missing") + return nil, nil, errors.New("genesis state is missing") } parent := eth.blockchain.GetBlock(current.ParentHash(), current.NumberU64()-1) if parent == nil { - return nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) + return nil, nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) } current = parent @@ -108,13 +128,14 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state if err != nil { switch err.(type) { case *trie.MissingNodeError: - return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) + return nil, nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) default: - return nil, err + return nil, nil, err } } } - // State was available at historical point, regenerate + // State is available at historical point, re-execute the blocks on top for + // the desired state. var ( start = time.Now() logged time.Time @@ -129,22 +150,24 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state // Retrieve the next block to regenerate and process it next := current.NumberU64() + 1 if current = eth.blockchain.GetBlockByNumber(next); current == nil { - return nil, fmt.Errorf("block #%d not found", next) + return nil, nil, fmt.Errorf("block #%d not found", next) } _, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{}) if err != nil { - return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie root, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) if err != nil { - return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", + return nil, nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", current.NumberU64(), current.Root().Hex(), err) } statedb, err = state.New(root, database, nil) if err != nil { - return nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) } + // Hold the state reference and also drop the parent state + // to prevent accumulating too many nodes in memory. database.TrieDB().Reference(root, common.Hash{}) if parent != (common.Hash{}) { database.TrieDB().Dereference(parent) @@ -155,28 +178,28 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state nodes, imgs := database.TrieDB().Size() log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) } - return statedb, nil + return statedb, func() { database.TrieDB().Dereference(block.Root()) }, nil } // stateAtTransaction returns the execution environment of a certain transaction. -func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { - return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis") + return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis") } // Create the parent state database parent := eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) } // Lookup the statedb of parent block from the live database, // otherwise regenerate it on the flight. - statedb, err := eth.StateAtBlock(parent, reexec, nil, true, false) + statedb, release, err := eth.StateAtBlock(parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(eth.blockchain.Config(), block.Number()) @@ -186,17 +209,17 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec txContext := core.NewEVMTxContext(msg) context := core.NewEVMBlockContext(block.Header(), eth.blockchain, nil) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, txContext, statedb, eth.blockchain.Config(), vm.Config{}) statedb.Prepare(tx.Hash(), idx) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 092950e78..0cf2f45a8 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -63,6 +63,10 @@ const ( defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024) ) +// StateReleaseFunc is used to deallocate resources held by constructing a +// historical state for tracing purposes. +type StateReleaseFunc func() + // Backend interface provides the common API services (that are provided by // both full and light clients) with access to necessary functions. type Backend interface { @@ -75,11 +79,8 @@ type Backend interface { ChainConfig() *params.ChainConfig Engine() consensus.Engine ChainDb() ethdb.Database - // StateAtBlock returns the state corresponding to the stateroot of the block. - // N.B: For executing transactions on block N, the required stateRoot is block N-1, - // so this method should be called with the parent. - StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error) - StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) + StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) + StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error) } // API is the collection of tracing APIs exposed over the private debugging endpoint. @@ -201,7 +202,7 @@ type txTraceResult struct { type blockTraceTask struct { statedb *state.StateDB // Intermediate state prepped for tracing block *types.Block // Block to trace the transactions from - rootref common.Hash // Trie root reference held for this task + release StateReleaseFunc // The function to release the held resource for this task results []*txTraceResult // Trace results produced by the task } @@ -234,13 +235,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf if from.Number().Cmp(to.Number()) >= 0 { return nil, fmt.Errorf("end block (#%d) needs to come after start block (#%d)", end, start) } - return api.traceChain(ctx, from, to, config) -} - -// traceChain configures a new tracer according to the provided configuration, and -// executes all the transactions contained within. The return value will be one item -// per transaction, dependent on the requested tracer. -func (api *API) traceChain(ctx context.Context, start, end *types.Block, config *TraceConfig) (*rpc.Subscription, error) { // Tracing a chain is a **long** operation, only do with subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -248,8 +242,45 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config } sub := notifier.CreateSubscription() - // Prepare all the states for tracing. Note this procedure can take very - // long time. Timeout mechanism is necessary. + resCh := api.traceChain(from, to, config, notifier.Closed()) + go func() { + for result := range resCh { + notifier.Notify(sub.ID, result) + } + }() + return sub, nil +} + +// releaser is a helper tool responsible for caching the release +// callbacks of tracing state. +type releaser struct { + releases []StateReleaseFunc + lock sync.Mutex +} + +func (r *releaser) add(release StateReleaseFunc) { + r.lock.Lock() + defer r.lock.Unlock() + + r.releases = append(r.releases, release) +} + +func (r *releaser) call() { + r.lock.Lock() + defer r.lock.Unlock() + + for _, release := range r.releases { + release() + } + r.releases = r.releases[:0] +} + +// traceChain configures a new tracer according to the provided configuration, and +// executes all the transactions contained within. The tracing chain range includes +// the end block but excludes the start one. The return value will be one item per +// transaction, dependent on the requested tracer. +// The tracing procedure should be aborted in case the closed signal is received. +func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed <-chan interface{}) chan *blockTraceResult { reexec := defaultTraceReexec if config != nil && config.Reexec != nil { reexec = *config.Reexec @@ -260,20 +291,23 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config threads = blocks } var ( - pend = new(sync.WaitGroup) - tasks = make(chan *blockTraceTask, threads) - results = make(chan *blockTraceTask, threads) - localctx = context.Background() + pend = new(sync.WaitGroup) + ctx = context.Background() + taskCh = make(chan *blockTraceTask, threads) + resCh = make(chan *blockTraceTask, threads) + reler = new(releaser) ) for th := 0; th < threads; th++ { pend.Add(1) go func() { defer pend.Done() - // Fetch and execute the next block trace tasks - for task := range tasks { - signer := types.MakeSigner(api.backend.ChainConfig(), task.block.Number()) - blockCtx := core.NewEVMBlockContext(task.block.Header(), api.chainContext(localctx), nil) + // Fetch and execute the block trace taskCh + for task := range taskCh { + var ( + signer = types.MakeSigner(api.backend.ChainConfig(), task.block.Number()) + blockCtx = core.NewEVMBlockContext(task.block.Header(), api.chainContext(ctx), nil) + ) // Trace all the transactions contained within for i, tx := range task.block.Transactions() { msg, _ := tx.AsMessage(signer, task.block.BaseFee()) @@ -282,7 +316,7 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config TxIndex: i, TxHash: tx.Hash(), } - res, err := api.traceTx(localctx, msg, txctx, blockCtx, task.statedb, config) + res, err := api.traceTx(ctx, msg, txctx, blockCtx, task.statedb, config) if err != nil { task.results[i] = &txTraceResult{Error: err.Error()} log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err) @@ -292,36 +326,38 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number())) task.results[i] = &txTraceResult{Result: res} } - // Stream the result back to the user or abort on teardown + // Tracing state is used up, queue it for de-referencing + reler.add(task.release) + + // Stream the result back to the result catcher or abort on teardown select { - case results <- task: - case <-notifier.Closed(): + case resCh <- task: + case <-closed: return } } }() } // Start a goroutine to feed all the blocks into the tracers - var ( - begin = time.Now() - derefTodo []common.Hash // list of hashes to dereference from the db - derefsMu sync.Mutex // mutex for the derefs - ) - go func() { var ( logged time.Time + begin = time.Now() number uint64 traced uint64 failed error - parent common.Hash statedb *state.StateDB + release StateReleaseFunc ) // Ensure everything is properly cleaned up on any exit path defer func() { - close(tasks) + close(taskCh) pend.Wait() + // Clean out any pending derefs. + reler.call() + + // Log the chain result switch { case failed != nil: log.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed) @@ -330,105 +366,97 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config default: log.Info("Chain tracing finished", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin)) } - close(results) + close(resCh) }() - var preferDisk bool // Feed all the blocks both into the tracer, as well as fast process concurrently for number = start.NumberU64(); number < end.NumberU64(); number++ { // Stop tracing if interruption was requested select { - case <-notifier.Closed(): + case <-closed: return default: } - // clean out any derefs - derefsMu.Lock() - for _, h := range derefTodo { - statedb.Database().TrieDB().Dereference(h) - } - derefTodo = derefTodo[:0] - derefsMu.Unlock() - // Print progress logs if long enough time elapsed if time.Since(logged) > 8*time.Second { logged = time.Now() log.Info("Tracing chain segment", "start", start.NumberU64(), "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin)) } - // Retrieve the parent state to trace on top - block, err := api.blockByNumber(localctx, rpc.BlockNumber(number)) + // Retrieve the parent block and target block for tracing. + block, err := api.blockByNumber(ctx, rpc.BlockNumber(number)) + if err != nil { + failed = err + break + } + next, err := api.blockByNumber(ctx, rpc.BlockNumber(number+1)) if err != nil { failed = err break } // Prepare the statedb for tracing. Don't use the live database for - // tracing to avoid persisting state junks into the database. - statedb, err = api.backend.StateAtBlock(localctx, block, reexec, statedb, false, preferDisk) + // tracing to avoid persisting state junks into the database. Switch + // over to `preferDisk` mode only if the memory usage exceeds the + // limit, the trie database will be reconstructed from scratch only + // if the relevant state is available in disk. + var preferDisk bool + if statedb != nil { + s1, s2 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2 > defaultTracechainMemLimit + } + statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk) if err != nil { failed = err break } - if trieDb := statedb.Database().TrieDB(); trieDb != nil { - // Hold the reference for tracer, will be released at the final stage - trieDb.Reference(block.Root(), common.Hash{}) + // Clean out any pending derefs. Note this step must be done after + // constructing tracing state, because the tracing state of block + // next depends on the parent state and construction may fail if + // we release too early. + reler.call() - // Release the parent state because it's already held by the tracer - if parent != (common.Hash{}) { - trieDb.Dereference(parent) - } - // Prefer disk if the trie db memory grows too much - s1, s2 := trieDb.Size() - if !preferDisk && (s1+s2) > defaultTracechainMemLimit { - log.Info("Switching to prefer-disk mode for tracing", "size", s1+s2) - preferDisk = true - } - } - parent = block.Root() - - next, err := api.blockByNumber(localctx, rpc.BlockNumber(number+1)) - if err != nil { - failed = err - break - } // Send the block over to the concurrent tracers (if not in the fast-forward phase) txs := next.Transactions() select { - case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, rootref: block.Root(), results: make([]*txTraceResult, len(txs))}: - case <-notifier.Closed(): + case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}: + case <-closed: + reler.add(release) return } traced += uint64(len(txs)) } }() - // Keep reading the trace results and stream the to the user + // Keep reading the trace results and stream them to result channel. + retCh := make(chan *blockTraceResult) go func() { + defer close(retCh) var ( - done = make(map[uint64]*blockTraceResult) next = start.NumberU64() + 1 + done = make(map[uint64]*blockTraceResult) ) - for res := range results { + for res := range resCh { // Queue up next received result result := &blockTraceResult{ Block: hexutil.Uint64(res.block.NumberU64()), Hash: res.block.Hash(), Traces: res.results, } - // Schedule any parent tries held in memory by this task for dereferencing done[uint64(result.Block)] = result - derefsMu.Lock() - derefTodo = append(derefTodo, res.rootref) - derefsMu.Unlock() - // Stream completed traces to the user, aborting on the first error + + // Stream completed traces to the result channel for result, ok := done[next]; ok; result, ok = done[next] { if len(result.Traces) > 0 || next == end.NumberU64() { - notifier.Notify(sub.ID, result) + // It will be blocked in case the channel consumer doesn't take the + // tracing result in time(e.g. the websocket connect is not stable) + // which will eventually block the entire chain tracer. It's the + // expected behavior to not waste node resources for a non-active user. + retCh <- result } delete(done, next) next++ } } }() - return sub, nil + return retCh } // TraceBlockByNumber returns the structured logs created during the execution of @@ -515,10 +543,12 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + var ( roots []common.Hash signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) @@ -576,10 +606,12 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Execute all the transaction contained within the block concurrently var ( signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) @@ -666,10 +698,12 @@ func (api *API) standardTraceBlockToFile(ctx context.Context, block *types.Block if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Retrieve the tracing configurations, or use default values var ( logConfig logger.Config @@ -793,10 +827,12 @@ func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config * if err != nil { return nil, err } - msg, vmctx, statedb, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) + msg, vmctx, statedb, release, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) if err != nil { return nil, err } + defer release() + txctx := &Context{ BlockHash: blockHash, TxIndex: int(index), @@ -837,10 +873,12 @@ func (api *API) TraceCall(ctx context.Context, args ethapi.TransactionArgs, bloc if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + vmctx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil) // Apply the customization rules if required. if config != nil { diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index 40d860b85..414ba6fe9 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -26,6 +26,7 @@ import ( "math/big" "reflect" "sort" + "sync/atomic" "testing" "time" @@ -57,6 +58,9 @@ type testBackend struct { engine consensus.Engine chaindb ethdb.Database chain *core.BlockChain + + refHook func() // Hook is invoked when the requested state is referenced + relHook func() // Hook is invoked when the requested state is released } func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i int, b *core.BlockGen)) *testBackend { @@ -133,25 +137,33 @@ func (b *testBackend) ChainDb() ethdb.Database { return b.chaindb } -func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { +func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) { statedb, err := b.chain.StateAt(block.Root()) if err != nil { - return nil, errStateNotFound + return nil, nil, errStateNotFound } - return statedb, nil + if b.refHook != nil { + b.refHook() + } + release := func() { + if b.relHook != nil { + b.relHook() + } + } + return statedb, release, nil } -func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error) { parent := b.chain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, nil, errBlockNotFound + return nil, vm.BlockContext{}, nil, nil, errBlockNotFound } - statedb, err := b.chain.StateAt(parent.Root()) + statedb, release, err := b.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, nil, errStateNotFound + return nil, vm.BlockContext{}, nil, nil, errStateNotFound } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(b.chainConfig, block.Number()) @@ -160,15 +172,15 @@ func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block txContext := core.NewEVMTxContext(msg) context := core.NewEVMBlockContext(block.Header(), b.chain, nil) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } vmenv := vm.NewEVM(context, txContext, statedb, b.chainConfig, vm.Config{}) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } func TestTraceCall(t *testing.T) { @@ -622,3 +634,74 @@ func newStates(keys []common.Hash, vals []common.Hash) *map[common.Hash]common.H } return &m } + +func TestTraceChain(t *testing.T) { + // Initialize test accounts + accounts := newAccounts(3) + genesis := &core.Genesis{Alloc: core.GenesisAlloc{ + accounts[0].addr: {Balance: big.NewInt(params.Ether)}, + accounts[1].addr: {Balance: big.NewInt(params.Ether)}, + accounts[2].addr: {Balance: big.NewInt(params.Ether)}, + }} + genBlocks := 50 + signer := types.HomesteadSigner{} + + var ( + ref uint32 // total refs has made + rel uint32 // total rels has made + nonce uint64 + ) + backend := newTestBackend(t, genBlocks, genesis, func(i int, b *core.BlockGen) { + // Transfer from account[0] to account[1] + // value: 1000 wei + // fee: 0 wei + for j := 0; j < i+1; j++ { + tx, _ := types.SignTx(types.NewTransaction(nonce, accounts[1].addr, big.NewInt(1000), params.TxGas, b.BaseFee(), nil), signer, accounts[0].key) + b.AddTx(tx) + nonce += 1 + } + }) + backend.refHook = func() { atomic.AddUint32(&ref, 1) } + backend.relHook = func() { atomic.AddUint32(&rel, 1) } + api := NewAPI(backend) + + single := `{"result":{"gas":21000,"failed":false,"returnValue":"","structLogs":[]}}` + var cases = []struct { + start uint64 + end uint64 + config *TraceConfig + }{ + {0, 50, nil}, // the entire chain range, blocks [1, 50] + {10, 20, nil}, // the middle chain range, blocks [11, 20] + } + for _, c := range cases { + ref, rel = 0, 0 // clean up the counters + + from, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.start)) + to, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.end)) + resCh := api.traceChain(from, to, c.config, nil) + + next := c.start + 1 + for result := range resCh { + if next != uint64(result.Block) { + t.Error("Unexpected tracing block") + } + if len(result.Traces) != int(next) { + t.Error("Unexpected tracing result") + } + for _, trace := range result.Traces { + blob, _ := json.Marshal(trace) + if string(blob) != single { + t.Error("Unexpected tracing result") + } + } + next += 1 + } + if next != c.end+1 { + t.Error("Missing tracing block") + } + if ref != rel { + t.Errorf("Ref and deref actions are not equal, ref %d rel %d", ref, rel) + } + } +} diff --git a/les/api_backend.go b/les/api_backend.go index 5b4213134..71cfbbed1 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/light" @@ -321,10 +322,10 @@ func (b *LesApiBackend) CurrentHeader() *types.Header { return b.eth.blockchain.CurrentHeader() } -func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { +func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtBlock(ctx, block, reexec) } -func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) } diff --git a/les/state_accessor.go b/les/state_accessor.go index 112e6fd44..a2d49fbf3 100644 --- a/les/state_accessor.go +++ b/les/state_accessor.go @@ -25,31 +25,36 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/light" ) +// noopReleaser is returned in case there is no operation expected +// for releasing state. +var noopReleaser = tracers.StateReleaseFunc(func() {}) + // stateAtBlock retrieves the state database associated with a certain block. -func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, error) { - return light.NewState(ctx, block.Header(), leth.odr), nil +func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, tracers.StateReleaseFunc, error) { + return light.NewState(ctx, block.Header(), leth.odr), noopReleaser, nil } // stateAtTransaction returns the execution environment of a certain transaction. -func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { - return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis") + return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis") } // Create the parent state database parent, err := leth.blockchain.GetBlock(ctx, block.ParentHash(), block.NumberU64()-1) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } - statedb, err := leth.stateAtBlock(ctx, parent, reexec) + statedb, release, err := leth.stateAtBlock(ctx, parent, reexec) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(leth.blockchain.Config(), block.Number()) @@ -60,16 +65,16 @@ func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types. context := core.NewEVMBlockContext(block.Header(), leth.blockchain, nil) statedb.Prepare(tx.Hash(), idx) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, txContext, statedb, leth.blockchain.Config(), vm.Config{}) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) }