diff --git a/eth/api.go b/eth/api.go index 5642ef4c3..3b5bb5f0a 100644 --- a/eth/api.go +++ b/eth/api.go @@ -411,10 +411,12 @@ func (api *DebugAPI) StorageRangeAt(blockHash common.Hash, txIndex int, contract if block == nil { return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash) } - _, _, statedb, err := api.eth.stateAtTransaction(block, txIndex, 0) + _, _, statedb, release, err := api.eth.stateAtTransaction(block, txIndex, 0) if err != nil { return StorageRangeResult{}, err } + defer release() + st := statedb.StorageTrie(contractAddress) if st == nil { return StorageRangeResult{}, fmt.Errorf("account %x doesn't exist", contractAddress) diff --git a/eth/api_backend.go b/eth/api_backend.go index 00ecacc31..97c22c8fb 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/miner" @@ -363,10 +364,10 @@ func (b *EthAPIBackend) StartMining(threads int) error { return b.eth.StartMining(threads) } -func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error) { - return b.eth.StateAtBlock(block, reexec, base, checkLive, preferDisk) +func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { + return b.eth.StateAtBlock(block, reexec, base, readOnly, preferDisk) } -func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(block, txIndex, reexec) } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 12dba8a0a..4651ef306 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -26,39 +26,59 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie" ) +// noopReleaser is returned in case there is no operation expected +// for releasing state. +var noopReleaser = tracers.StateReleaseFunc(func() {}) + // StateAtBlock retrieves the state database associated with a certain block. // If no state is locally available for the given block, a number of blocks // are attempted to be reexecuted to generate the desired state. The optional -// base layer statedb can be passed then it's regarded as the statedb of the +// base layer statedb can be provided which is regarded as the statedb of the // parent block. +// +// An additional release function will be returned if the requested state is +// available. Release is expected to be invoked when the returned state is no longer needed. +// Its purpose is to prevent resource leaking. Though it can be noop in some cases. +// // Parameters: -// - block: The block for which we want the state (== state at the stateRoot of the parent) -// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state -// - base: If the caller is tracing multiple blocks, the caller can provide the parent state -// continuously from the callsite. -// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to -// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid -// storing trash persistently -// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, -// it would be preferable to start from a fresh state, if we have it on disk. -func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { +// - block: The block for which we want the state(state = block.Root) +// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state +// - base: If the caller is tracing multiple blocks, the caller can provide the parent +// state continuously from the callsite. +// - readOnly: If true, then the live 'blockchain' state database is used. No mutation should +// be made from caller, e.g. perform Commit or other 'save-to-disk' changes. +// Otherwise, the trash generated by caller may be persisted permanently. +// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is +// provided, it would be preferable to start from a fresh state, if we have it +// on disk. +func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) { var ( current *types.Block database state.Database report = true origin = block.NumberU64() ) - // Check the live database first if we have the state fully available, use that. - if checkLive { - statedb, err = eth.blockchain.StateAt(block.Root()) - if err == nil { - return statedb, nil + // The state is only for reading purposes, check the state presence in + // live database. + if readOnly { + // The state is available in live database, create a reference + // on top to prevent garbage collection and return a release + // function to deref it. + if statedb, err = eth.blockchain.StateAt(block.Root()); err == nil { + statedb.Database().TrieDB().Reference(block.Root(), common.Hash{}) + return statedb, func() { + statedb.Database().TrieDB().Dereference(block.Root()) + }, nil } } + // The state is both for reading and writing, or it's unavailable in disk, + // try to construct/recover the state over an ephemeral trie.Database for + // isolating the live one. if base != nil { if preferDisk { // Create an ephemeral trie.Database for isolating the live one. Otherwise @@ -66,37 +86,37 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16}) if statedb, err = state.New(block.Root(), database, nil); err == nil { log.Info("Found disk backend for state trie", "root", block.Root(), "number", block.Number()) - return statedb, nil + return statedb, noopReleaser, nil } } // The optional base statedb is given, mark the start point as parent block statedb, database, report = base, base.Database(), false current = eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) } else { - // Otherwise try to reexec blocks until we find a state or reach our limit + // Otherwise, try to reexec blocks until we find a state or reach our limit current = block // Create an ephemeral trie.Database for isolating the live one. Otherwise // the internal junks created by tracing will be persisted into the disk. database = state.NewDatabaseWithConfig(eth.chainDb, &trie.Config{Cache: 16}) - // If we didn't check the dirty database, do check the clean one, otherwise - // we would rewind past a persisted block (specific corner case is chain - // tracing from the genesis). - if !checkLive { + // If we didn't check the live database, do check state over ephemeral database, + // otherwise we would rewind past a persisted block (specific corner case is + // chain tracing from the genesis). + if !readOnly { statedb, err = state.New(current.Root(), database, nil) if err == nil { - return statedb, nil + return statedb, noopReleaser, nil } } // Database does not have the state for the given block, try to regenerate for i := uint64(0); i < reexec; i++ { if current.NumberU64() == 0 { - return nil, errors.New("genesis state is missing") + return nil, nil, errors.New("genesis state is missing") } parent := eth.blockchain.GetBlock(current.ParentHash(), current.NumberU64()-1) if parent == nil { - return nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) + return nil, nil, fmt.Errorf("missing block %v %d", current.ParentHash(), current.NumberU64()-1) } current = parent @@ -108,13 +128,14 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state if err != nil { switch err.(type) { case *trie.MissingNodeError: - return nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) + return nil, nil, fmt.Errorf("required historical state unavailable (reexec=%d)", reexec) default: - return nil, err + return nil, nil, err } } } - // State was available at historical point, regenerate + // State is available at historical point, re-execute the blocks on top for + // the desired state. var ( start = time.Now() logged time.Time @@ -129,22 +150,24 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state // Retrieve the next block to regenerate and process it next := current.NumberU64() + 1 if current = eth.blockchain.GetBlockByNumber(next); current == nil { - return nil, fmt.Errorf("block #%d not found", next) + return nil, nil, fmt.Errorf("block #%d not found", next) } _, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{}) if err != nil { - return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie root, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) if err != nil { - return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", + return nil, nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", current.NumberU64(), current.Root().Hex(), err) } statedb, err = state.New(root, database, nil) if err != nil { - return nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) + return nil, nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) } + // Hold the state reference and also drop the parent state + // to prevent accumulating too many nodes in memory. database.TrieDB().Reference(root, common.Hash{}) if parent != (common.Hash{}) { database.TrieDB().Dereference(parent) @@ -155,28 +178,28 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state nodes, imgs := database.TrieDB().Size() log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) } - return statedb, nil + return statedb, func() { database.TrieDB().Dereference(block.Root()) }, nil } // stateAtTransaction returns the execution environment of a certain transaction. -func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { - return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis") + return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis") } // Create the parent state database parent := eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("parent %#x not found", block.ParentHash()) } // Lookup the statedb of parent block from the live database, // otherwise regenerate it on the flight. - statedb, err := eth.StateAtBlock(parent, reexec, nil, true, false) + statedb, release, err := eth.StateAtBlock(parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(eth.blockchain.Config(), block.Number()) @@ -186,17 +209,17 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec txContext := core.NewEVMTxContext(msg) context := core.NewEVMBlockContext(block.Header(), eth.blockchain, nil) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, txContext, statedb, eth.blockchain.Config(), vm.Config{}) statedb.Prepare(tx.Hash(), idx) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 092950e78..0cf2f45a8 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -63,6 +63,10 @@ const ( defaultTracechainMemLimit = common.StorageSize(500 * 1024 * 1024) ) +// StateReleaseFunc is used to deallocate resources held by constructing a +// historical state for tracing purposes. +type StateReleaseFunc func() + // Backend interface provides the common API services (that are provided by // both full and light clients) with access to necessary functions. type Backend interface { @@ -75,11 +79,8 @@ type Backend interface { ChainConfig() *params.ChainConfig Engine() consensus.Engine ChainDb() ethdb.Database - // StateAtBlock returns the state corresponding to the stateroot of the block. - // N.B: For executing transactions on block N, the required stateRoot is block N-1, - // so this method should be called with the parent. - StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive, preferDisk bool) (*state.StateDB, error) - StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) + StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) + StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error) } // API is the collection of tracing APIs exposed over the private debugging endpoint. @@ -201,7 +202,7 @@ type txTraceResult struct { type blockTraceTask struct { statedb *state.StateDB // Intermediate state prepped for tracing block *types.Block // Block to trace the transactions from - rootref common.Hash // Trie root reference held for this task + release StateReleaseFunc // The function to release the held resource for this task results []*txTraceResult // Trace results produced by the task } @@ -234,13 +235,6 @@ func (api *API) TraceChain(ctx context.Context, start, end rpc.BlockNumber, conf if from.Number().Cmp(to.Number()) >= 0 { return nil, fmt.Errorf("end block (#%d) needs to come after start block (#%d)", end, start) } - return api.traceChain(ctx, from, to, config) -} - -// traceChain configures a new tracer according to the provided configuration, and -// executes all the transactions contained within. The return value will be one item -// per transaction, dependent on the requested tracer. -func (api *API) traceChain(ctx context.Context, start, end *types.Block, config *TraceConfig) (*rpc.Subscription, error) { // Tracing a chain is a **long** operation, only do with subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -248,8 +242,45 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config } sub := notifier.CreateSubscription() - // Prepare all the states for tracing. Note this procedure can take very - // long time. Timeout mechanism is necessary. + resCh := api.traceChain(from, to, config, notifier.Closed()) + go func() { + for result := range resCh { + notifier.Notify(sub.ID, result) + } + }() + return sub, nil +} + +// releaser is a helper tool responsible for caching the release +// callbacks of tracing state. +type releaser struct { + releases []StateReleaseFunc + lock sync.Mutex +} + +func (r *releaser) add(release StateReleaseFunc) { + r.lock.Lock() + defer r.lock.Unlock() + + r.releases = append(r.releases, release) +} + +func (r *releaser) call() { + r.lock.Lock() + defer r.lock.Unlock() + + for _, release := range r.releases { + release() + } + r.releases = r.releases[:0] +} + +// traceChain configures a new tracer according to the provided configuration, and +// executes all the transactions contained within. The tracing chain range includes +// the end block but excludes the start one. The return value will be one item per +// transaction, dependent on the requested tracer. +// The tracing procedure should be aborted in case the closed signal is received. +func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed <-chan interface{}) chan *blockTraceResult { reexec := defaultTraceReexec if config != nil && config.Reexec != nil { reexec = *config.Reexec @@ -260,20 +291,23 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config threads = blocks } var ( - pend = new(sync.WaitGroup) - tasks = make(chan *blockTraceTask, threads) - results = make(chan *blockTraceTask, threads) - localctx = context.Background() + pend = new(sync.WaitGroup) + ctx = context.Background() + taskCh = make(chan *blockTraceTask, threads) + resCh = make(chan *blockTraceTask, threads) + reler = new(releaser) ) for th := 0; th < threads; th++ { pend.Add(1) go func() { defer pend.Done() - // Fetch and execute the next block trace tasks - for task := range tasks { - signer := types.MakeSigner(api.backend.ChainConfig(), task.block.Number()) - blockCtx := core.NewEVMBlockContext(task.block.Header(), api.chainContext(localctx), nil) + // Fetch and execute the block trace taskCh + for task := range taskCh { + var ( + signer = types.MakeSigner(api.backend.ChainConfig(), task.block.Number()) + blockCtx = core.NewEVMBlockContext(task.block.Header(), api.chainContext(ctx), nil) + ) // Trace all the transactions contained within for i, tx := range task.block.Transactions() { msg, _ := tx.AsMessage(signer, task.block.BaseFee()) @@ -282,7 +316,7 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config TxIndex: i, TxHash: tx.Hash(), } - res, err := api.traceTx(localctx, msg, txctx, blockCtx, task.statedb, config) + res, err := api.traceTx(ctx, msg, txctx, blockCtx, task.statedb, config) if err != nil { task.results[i] = &txTraceResult{Error: err.Error()} log.Warn("Tracing failed", "hash", tx.Hash(), "block", task.block.NumberU64(), "err", err) @@ -292,36 +326,38 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config task.statedb.Finalise(api.backend.ChainConfig().IsEIP158(task.block.Number())) task.results[i] = &txTraceResult{Result: res} } - // Stream the result back to the user or abort on teardown + // Tracing state is used up, queue it for de-referencing + reler.add(task.release) + + // Stream the result back to the result catcher or abort on teardown select { - case results <- task: - case <-notifier.Closed(): + case resCh <- task: + case <-closed: return } } }() } // Start a goroutine to feed all the blocks into the tracers - var ( - begin = time.Now() - derefTodo []common.Hash // list of hashes to dereference from the db - derefsMu sync.Mutex // mutex for the derefs - ) - go func() { var ( logged time.Time + begin = time.Now() number uint64 traced uint64 failed error - parent common.Hash statedb *state.StateDB + release StateReleaseFunc ) // Ensure everything is properly cleaned up on any exit path defer func() { - close(tasks) + close(taskCh) pend.Wait() + // Clean out any pending derefs. + reler.call() + + // Log the chain result switch { case failed != nil: log.Warn("Chain tracing failed", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin), "err", failed) @@ -330,105 +366,97 @@ func (api *API) traceChain(ctx context.Context, start, end *types.Block, config default: log.Info("Chain tracing finished", "start", start.NumberU64(), "end", end.NumberU64(), "transactions", traced, "elapsed", time.Since(begin)) } - close(results) + close(resCh) }() - var preferDisk bool // Feed all the blocks both into the tracer, as well as fast process concurrently for number = start.NumberU64(); number < end.NumberU64(); number++ { // Stop tracing if interruption was requested select { - case <-notifier.Closed(): + case <-closed: return default: } - // clean out any derefs - derefsMu.Lock() - for _, h := range derefTodo { - statedb.Database().TrieDB().Dereference(h) - } - derefTodo = derefTodo[:0] - derefsMu.Unlock() - // Print progress logs if long enough time elapsed if time.Since(logged) > 8*time.Second { logged = time.Now() log.Info("Tracing chain segment", "start", start.NumberU64(), "end", end.NumberU64(), "current", number, "transactions", traced, "elapsed", time.Since(begin)) } - // Retrieve the parent state to trace on top - block, err := api.blockByNumber(localctx, rpc.BlockNumber(number)) + // Retrieve the parent block and target block for tracing. + block, err := api.blockByNumber(ctx, rpc.BlockNumber(number)) + if err != nil { + failed = err + break + } + next, err := api.blockByNumber(ctx, rpc.BlockNumber(number+1)) if err != nil { failed = err break } // Prepare the statedb for tracing. Don't use the live database for - // tracing to avoid persisting state junks into the database. - statedb, err = api.backend.StateAtBlock(localctx, block, reexec, statedb, false, preferDisk) + // tracing to avoid persisting state junks into the database. Switch + // over to `preferDisk` mode only if the memory usage exceeds the + // limit, the trie database will be reconstructed from scratch only + // if the relevant state is available in disk. + var preferDisk bool + if statedb != nil { + s1, s2 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2 > defaultTracechainMemLimit + } + statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk) if err != nil { failed = err break } - if trieDb := statedb.Database().TrieDB(); trieDb != nil { - // Hold the reference for tracer, will be released at the final stage - trieDb.Reference(block.Root(), common.Hash{}) + // Clean out any pending derefs. Note this step must be done after + // constructing tracing state, because the tracing state of block + // next depends on the parent state and construction may fail if + // we release too early. + reler.call() - // Release the parent state because it's already held by the tracer - if parent != (common.Hash{}) { - trieDb.Dereference(parent) - } - // Prefer disk if the trie db memory grows too much - s1, s2 := trieDb.Size() - if !preferDisk && (s1+s2) > defaultTracechainMemLimit { - log.Info("Switching to prefer-disk mode for tracing", "size", s1+s2) - preferDisk = true - } - } - parent = block.Root() - - next, err := api.blockByNumber(localctx, rpc.BlockNumber(number+1)) - if err != nil { - failed = err - break - } // Send the block over to the concurrent tracers (if not in the fast-forward phase) txs := next.Transactions() select { - case tasks <- &blockTraceTask{statedb: statedb.Copy(), block: next, rootref: block.Root(), results: make([]*txTraceResult, len(txs))}: - case <-notifier.Closed(): + case taskCh <- &blockTraceTask{statedb: statedb.Copy(), block: next, release: release, results: make([]*txTraceResult, len(txs))}: + case <-closed: + reler.add(release) return } traced += uint64(len(txs)) } }() - // Keep reading the trace results and stream the to the user + // Keep reading the trace results and stream them to result channel. + retCh := make(chan *blockTraceResult) go func() { + defer close(retCh) var ( - done = make(map[uint64]*blockTraceResult) next = start.NumberU64() + 1 + done = make(map[uint64]*blockTraceResult) ) - for res := range results { + for res := range resCh { // Queue up next received result result := &blockTraceResult{ Block: hexutil.Uint64(res.block.NumberU64()), Hash: res.block.Hash(), Traces: res.results, } - // Schedule any parent tries held in memory by this task for dereferencing done[uint64(result.Block)] = result - derefsMu.Lock() - derefTodo = append(derefTodo, res.rootref) - derefsMu.Unlock() - // Stream completed traces to the user, aborting on the first error + + // Stream completed traces to the result channel for result, ok := done[next]; ok; result, ok = done[next] { if len(result.Traces) > 0 || next == end.NumberU64() { - notifier.Notify(sub.ID, result) + // It will be blocked in case the channel consumer doesn't take the + // tracing result in time(e.g. the websocket connect is not stable) + // which will eventually block the entire chain tracer. It's the + // expected behavior to not waste node resources for a non-active user. + retCh <- result } delete(done, next) next++ } } }() - return sub, nil + return retCh } // TraceBlockByNumber returns the structured logs created during the execution of @@ -515,10 +543,12 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + var ( roots []common.Hash signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) @@ -576,10 +606,12 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Execute all the transaction contained within the block concurrently var ( signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) @@ -666,10 +698,12 @@ func (api *API) standardTraceBlockToFile(ctx context.Context, block *types.Block if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + // Retrieve the tracing configurations, or use default values var ( logConfig logger.Config @@ -793,10 +827,12 @@ func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config * if err != nil { return nil, err } - msg, vmctx, statedb, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) + msg, vmctx, statedb, release, err := api.backend.StateAtTransaction(ctx, block, int(index), reexec) if err != nil { return nil, err } + defer release() + txctx := &Context{ BlockHash: blockHash, TxIndex: int(index), @@ -837,10 +873,12 @@ func (api *API) TraceCall(ctx context.Context, args ethapi.TransactionArgs, bloc if config != nil && config.Reexec != nil { reexec = *config.Reexec } - statedb, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) + statedb, release, err := api.backend.StateAtBlock(ctx, block, reexec, nil, true, false) if err != nil { return nil, err } + defer release() + vmctx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil) // Apply the customization rules if required. if config != nil { diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index 40d860b85..414ba6fe9 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -26,6 +26,7 @@ import ( "math/big" "reflect" "sort" + "sync/atomic" "testing" "time" @@ -57,6 +58,9 @@ type testBackend struct { engine consensus.Engine chaindb ethdb.Database chain *core.BlockChain + + refHook func() // Hook is invoked when the requested state is referenced + relHook func() // Hook is invoked when the requested state is released } func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i int, b *core.BlockGen)) *testBackend { @@ -133,25 +137,33 @@ func (b *testBackend) ChainDb() ethdb.Database { return b.chaindb } -func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { +func (b *testBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, StateReleaseFunc, error) { statedb, err := b.chain.StateAt(block.Root()) if err != nil { - return nil, errStateNotFound + return nil, nil, errStateNotFound } - return statedb, nil + if b.refHook != nil { + b.refHook() + } + release := func() { + if b.relHook != nil { + b.relHook() + } + } + return statedb, release, nil } -func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, StateReleaseFunc, error) { parent := b.chain.GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { - return nil, vm.BlockContext{}, nil, errBlockNotFound + return nil, vm.BlockContext{}, nil, nil, errBlockNotFound } - statedb, err := b.chain.StateAt(parent.Root()) + statedb, release, err := b.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { - return nil, vm.BlockContext{}, nil, errStateNotFound + return nil, vm.BlockContext{}, nil, nil, errStateNotFound } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(b.chainConfig, block.Number()) @@ -160,15 +172,15 @@ func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block txContext := core.NewEVMTxContext(msg) context := core.NewEVMBlockContext(block.Header(), b.chain, nil) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } vmenv := vm.NewEVM(context, txContext, statedb, b.chainConfig, vm.Config{}) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } func TestTraceCall(t *testing.T) { @@ -622,3 +634,74 @@ func newStates(keys []common.Hash, vals []common.Hash) *map[common.Hash]common.H } return &m } + +func TestTraceChain(t *testing.T) { + // Initialize test accounts + accounts := newAccounts(3) + genesis := &core.Genesis{Alloc: core.GenesisAlloc{ + accounts[0].addr: {Balance: big.NewInt(params.Ether)}, + accounts[1].addr: {Balance: big.NewInt(params.Ether)}, + accounts[2].addr: {Balance: big.NewInt(params.Ether)}, + }} + genBlocks := 50 + signer := types.HomesteadSigner{} + + var ( + ref uint32 // total refs has made + rel uint32 // total rels has made + nonce uint64 + ) + backend := newTestBackend(t, genBlocks, genesis, func(i int, b *core.BlockGen) { + // Transfer from account[0] to account[1] + // value: 1000 wei + // fee: 0 wei + for j := 0; j < i+1; j++ { + tx, _ := types.SignTx(types.NewTransaction(nonce, accounts[1].addr, big.NewInt(1000), params.TxGas, b.BaseFee(), nil), signer, accounts[0].key) + b.AddTx(tx) + nonce += 1 + } + }) + backend.refHook = func() { atomic.AddUint32(&ref, 1) } + backend.relHook = func() { atomic.AddUint32(&rel, 1) } + api := NewAPI(backend) + + single := `{"result":{"gas":21000,"failed":false,"returnValue":"","structLogs":[]}}` + var cases = []struct { + start uint64 + end uint64 + config *TraceConfig + }{ + {0, 50, nil}, // the entire chain range, blocks [1, 50] + {10, 20, nil}, // the middle chain range, blocks [11, 20] + } + for _, c := range cases { + ref, rel = 0, 0 // clean up the counters + + from, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.start)) + to, _ := api.blockByNumber(context.Background(), rpc.BlockNumber(c.end)) + resCh := api.traceChain(from, to, c.config, nil) + + next := c.start + 1 + for result := range resCh { + if next != uint64(result.Block) { + t.Error("Unexpected tracing block") + } + if len(result.Traces) != int(next) { + t.Error("Unexpected tracing result") + } + for _, trace := range result.Traces { + blob, _ := json.Marshal(trace) + if string(blob) != single { + t.Error("Unexpected tracing result") + } + } + next += 1 + } + if next != c.end+1 { + t.Error("Missing tracing block") + } + if ref != rel { + t.Errorf("Ref and deref actions are not equal, ref %d rel %d", ref, rel) + } + } +} diff --git a/les/api_backend.go b/les/api_backend.go index 5b4213134..71cfbbed1 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/light" @@ -321,10 +322,10 @@ func (b *LesApiBackend) CurrentHeader() *types.Header { return b.eth.blockchain.CurrentHeader() } -func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (*state.StateDB, error) { +func (b *LesApiBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtBlock(ctx, block, reexec) } -func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (b *LesApiBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) } diff --git a/les/state_accessor.go b/les/state_accessor.go index 112e6fd44..a2d49fbf3 100644 --- a/les/state_accessor.go +++ b/les/state_accessor.go @@ -25,31 +25,36 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/light" ) +// noopReleaser is returned in case there is no operation expected +// for releasing state. +var noopReleaser = tracers.StateReleaseFunc(func() {}) + // stateAtBlock retrieves the state database associated with a certain block. -func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, error) { - return light.NewState(ctx, block.Header(), leth.odr), nil +func (leth *LightEthereum) stateAtBlock(ctx context.Context, block *types.Block, reexec uint64) (*state.StateDB, tracers.StateReleaseFunc, error) { + return light.NewState(ctx, block.Header(), leth.odr), noopReleaser, nil } // stateAtTransaction returns the execution environment of a certain transaction. -func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, error) { +func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { - return nil, vm.BlockContext{}, nil, errors.New("no transaction in genesis") + return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis") } // Create the parent state database parent, err := leth.blockchain.GetBlock(ctx, block.ParentHash(), block.NumberU64()-1) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } - statedb, err := leth.stateAtBlock(ctx, parent, reexec) + statedb, release, err := leth.stateAtBlock(ctx, parent, reexec) if err != nil { - return nil, vm.BlockContext{}, nil, err + return nil, vm.BlockContext{}, nil, nil, err } if txIndex == 0 && len(block.Transactions()) == 0 { - return nil, vm.BlockContext{}, statedb, nil + return nil, vm.BlockContext{}, statedb, release, nil } // Recompute transactions up to the target index. signer := types.MakeSigner(leth.blockchain.Config(), block.Number()) @@ -60,16 +65,16 @@ func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types. context := core.NewEVMBlockContext(block.Header(), leth.blockchain, nil) statedb.Prepare(tx.Hash(), idx) if idx == txIndex { - return msg, context, statedb, nil + return msg, context, statedb, release, nil } // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, txContext, statedb, leth.blockchain.Config(), vm.Config{}) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } - return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) + return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) }