From 1e1865b73f6b0d2fef656d2f37e20e41d13a5ef0 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 5 Feb 2020 13:12:09 +0100 Subject: [PATCH 1/2] core: implement background trie prefetcher Squashed from the following commits: core/state: lazily init snapshot storage map core/state: fix flawed meter on storage reads core/state: make statedb/stateobjects reuse a hasher core/blockchain, core/state: implement new trie prefetcher core: make trie prefetcher deliver tries to statedb core/state: refactor trie_prefetcher, export storage tries blockchain: re-enable the next-block-prefetcher state: remove panics in trie prefetcher core/state/trie_prefetcher: address some review concerns sq --- core/blockchain.go | 27 +++- core/state/database.go | 12 +- core/state/state_object.go | 72 +++++++--- core/state/statedb.go | 45 +++++- core/state/trie_prefetcher.go | 249 ++++++++++++++++++++++++++++++++++ crypto/crypto.go | 17 ++- crypto/crypto_test.go | 7 + 7 files changed, 395 insertions(+), 34 deletions(-) create mode 100644 core/state/trie_prefetcher.go diff --git a/core/blockchain.go b/core/blockchain.go index b8f483b85..ccb99bded 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -201,11 +201,12 @@ type BlockChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - engine consensus.Engine - validator Validator // Block and state validator interface - prefetcher Prefetcher // Block state prefetcher interface - processor Processor // Block transaction processor interface - vmConfig vm.Config + engine consensus.Engine + validator Validator // Block and state validator interface + triePrefetcher *state.TriePrefetcher // Trie prefetcher interface + prefetcher Prefetcher + processor Processor // Block transaction processor interface + vmConfig vm.Config shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -249,6 +250,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + tp := state.NewTriePrefetcher(bc.stateCache) + + bc.wg.Add(1) + go func() { + tp.Loop() + bc.wg.Done() + }() + bc.triePrefetcher = tp + bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error @@ -991,6 +1001,9 @@ func (bc *BlockChain) Stop() { bc.scope.Close() close(bc.quit) bc.StopInsert() + if bc.triePrefetcher != nil { + bc.triePrefetcher.Close() + } bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1857,6 +1870,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb.UsePrefetcher(bc.triePrefetcher) if err != nil { return it.index, err } @@ -1891,8 +1905,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them - - triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation + triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates diff --git a/core/state/database.go b/core/state/database.go index 83f7b2a83..1a06e3340 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -129,12 +129,20 @@ type cachingDB struct { // OpenTrie opens the main account trie at a specific root hash. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // CopyTrie returns an independent copy of the given trie. diff --git a/core/state/state_object.go b/core/state/state_object.go index d0d3b4513..43c5074d9 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -157,11 +157,20 @@ func (s *stateObject) touch() { func (s *stateObject) getTrie(db Database) Trie { if s.trie == nil { - var err error - s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) - if err != nil { - s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) - s.setError(fmt.Errorf("can't create storage trie: %v", err)) + // Try fetching from prefetcher first + // We don't prefetch empty tries + if s.data.Root != emptyRoot && s.db.prefetcher != nil { + // When the miner is creating the pending state, there is no + // prefetcher + s.trie = s.db.prefetcher.GetTrie(s.data.Root) + } + if s.trie == nil { + var err error + s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) + if err != nil { + s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) + s.setError(fmt.Errorf("can't create storage trie: %v", err)) + } } } return s.trie @@ -197,12 +206,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has } // If no live objects are available, attempt to use snapshots var ( - enc []byte - err error + enc []byte + err error + meter *time.Duration ) + readStart := time.Now() + if metrics.EnabledExpensive { + // If the snap is 'under construction', the first lookup may fail. If that + // happens, we don't want to double-count the time elapsed. Thus this + // dance with the metering. + defer func() { + if meter != nil { + *meter += time.Since(readStart) + } + }() + } if s.db.snap != nil { if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now()) + meter = &s.db.SnapshotStorageReads } // If the object was destructed in *this* block (and potentially resurrected), // the storage has been cleared out, and we should *not* consult the previous @@ -217,8 +238,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has } // If snapshot unavailable or reading from it failed, load from the database if s.db.snap == nil || err != nil { + if meter != nil { + // If we already spent time checking the snapshot, account for it + // and reset the readStart + *meter += time.Since(readStart) + readStart = time.Now() + } if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now()) + meter = &s.db.StorageReads } if enc, err = s.getTrie(db).TryGet(key.Bytes()); err != nil { s.setError(err) @@ -283,8 +310,13 @@ func (s *stateObject) setState(key, value common.Hash) { // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. func (s *stateObject) finalise() { + trieChanges := make([]common.Hash, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { s.pendingStorage[key] = value + trieChanges = append(trieChanges, key) + } + if len(trieChanges) > 0 && s.db.prefetcher != nil && s.data.Root != emptyRoot { + s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges) } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -303,18 +335,11 @@ func (s *stateObject) updateTrie(db Database) Trie { if metrics.EnabledExpensive { defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) } - // Retrieve the snapshot storage map for the object + // The snapshot storage map for the object var storage map[common.Hash][]byte - if s.db.snap != nil { - // Retrieve the old storage map, if available, create a new one otherwise - storage = s.db.snapStorage[s.addrHash] - if storage == nil { - storage = make(map[common.Hash][]byte) - s.db.snapStorage[s.addrHash] = storage - } - } // Insert all the pending updates into the trie tr := s.getTrie(db) + hasher := s.db.hasher for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { @@ -331,8 +356,15 @@ func (s *stateObject) updateTrie(db Database) Trie { s.setError(tr.TryUpdate(key[:], v)) } // If state snapshotting is active, cache the data til commit - if storage != nil { - storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00 + if s.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = s.db.snapStorage[s.addrHash]; storage == nil { + storage = make(map[common.Hash][]byte) + s.db.snapStorage[s.addrHash] = storage + } + } + storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00 } } if len(s.pendingStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index a9d1de2e0..ce50962e8 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -62,8 +62,11 @@ func (n *proofList) Delete(key []byte) error { // * Contracts // * Accounts type StateDB struct { - db Database - trie Trie + db Database + prefetcher *TriePrefetcher + originalRoot common.Hash // The pre-state root, before any changes were made + trie Trie + hasher crypto.KeccakState snaps *snapshot.Tree snap snapshot.Snapshot @@ -125,6 +128,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) sdb := &StateDB{ db: db, trie: tr, + originalRoot: root, snaps: snaps, stateObjects: make(map[common.Address]*stateObject), stateObjectsPending: make(map[common.Address]struct{}), @@ -133,6 +137,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) preimages: make(map[common.Hash][]byte), journal: newJournal(), accessList: newAccessList(), + hasher: crypto.NewKeccakState(), } if sdb.snaps != nil { if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { @@ -144,6 +149,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return sdb, nil } +func (s *StateDB) UsePrefetcher(prefetcher *TriePrefetcher) { + if prefetcher != nil { + s.prefetcher = prefetcher + s.prefetcher.Resume(s.originalRoot) + } +} + // setError remembers the first non-nil error it is called with. func (s *StateDB) setError(err error) { if s.dbErr == nil { @@ -532,7 +544,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject { defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now()) } var acc *snapshot.Account - if acc, err = s.snap.Account(crypto.Keccak256Hash(addr.Bytes())); err == nil { + if acc, err = s.snap.Account(crypto.HashData(s.hasher, addr.Bytes())); err == nil { if acc == nil { return nil } @@ -675,6 +687,7 @@ func (s *StateDB) Copy() *StateDB { logSize: s.logSize, preimages: make(map[common.Hash][]byte, len(s.preimages)), journal: newJournal(), + hasher: crypto.NewKeccakState(), } // Copy the dirty states, logs, and preimages for addr := range s.journal.dirties { @@ -760,6 +773,7 @@ func (s *StateDB) GetRefund() uint64 { // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. func (s *StateDB) Finalise(deleteEmptyObjects bool) { + var addressesToPrefetch []common.Address for addr := range s.journal.dirties { obj, exist := s.stateObjects[addr] if !exist { @@ -788,7 +802,17 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { } s.stateObjectsPending[addr] = struct{}{} s.stateObjectsDirty[addr] = struct{}{} + // At this point, also ship the address off to the precacher. The precacher + // will start loading tries, and when the change is eventually committed, + // the commit-phase will be a lot faster + if s.prefetcher != nil { + addressesToPrefetch = append(addressesToPrefetch, addr) + } } + if s.prefetcher != nil { + s.prefetcher.PrefetchAddresses(addressesToPrefetch) + } + // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() } @@ -800,6 +824,21 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) + // Now we're about to start to write changes to the trie. The trie is so + // far _untouched_. We can check with the prefetcher, if it can give us + // a trie which has the same root, but also has some content loaded into it. + // If so, use that one instead. + if s.prefetcher != nil { + s.prefetcher.Pause() + // We only want to do this _once_, if someone calls IntermediateRoot again, + // we shouldn't fetch the trie again + if s.originalRoot != (common.Hash{}) { + if trie := s.prefetcher.GetTrie(s.originalRoot); trie != nil { + s.trie = trie + } + s.originalRoot = common.Hash{} + } + } for addr := range s.stateObjectsPending { obj := s.stateObjects[addr] if obj.deleted { diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go new file mode 100644 index 000000000..8a1aab325 --- /dev/null +++ b/core/state/trie_prefetcher.go @@ -0,0 +1,249 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + // trieDeliveryMeter counts how many times the prefetcher was unable to supply + // the statedb with a prefilled trie. This meter should be zero -- if it's not, that + // needs to be investigated + trieDeliveryMissMeter = metrics.NewRegisteredMeter("trie/prefetch/deliverymiss", nil) + + triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil) + triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil) + triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil) +) + +// TriePrefetcher is an active prefetcher, which receives accounts or storage +// items on two channels, and does trie-loading of the items. +// The goal is to get as much useful content into the caches as possible +type TriePrefetcher struct { + requestCh chan (fetchRequest) // Chan to receive requests for data to fetch + cmdCh chan (*cmd) // Chan to control activity, pause/new root + quitCh chan (struct{}) + deliveryCh chan (struct{}) + db Database + + paused bool + + storageTries map[common.Hash]Trie + accountTrie Trie + accountTrieRoot common.Hash +} + +func NewTriePrefetcher(db Database) *TriePrefetcher { + return &TriePrefetcher{ + requestCh: make(chan fetchRequest, 200), + cmdCh: make(chan *cmd), + quitCh: make(chan struct{}), + deliveryCh: make(chan struct{}), + db: db, + } +} + +type cmd struct { + root common.Hash +} + +type fetchRequest struct { + slots []common.Hash + storageRoot *common.Hash + addresses []common.Address +} + +func (p *TriePrefetcher) Loop() { + var ( + accountTrieRoot common.Hash + accountTrie Trie + storageTries map[common.Hash]Trie + + err error + // Some tracking of performance + skipped int64 + fetched int64 + + paused = true + ) + // The prefetcher loop has two distinct phases: + // 1: Paused: when in this state, the accumulated tries are accessible to outside + // callers. + // 2: Active prefetching, awaiting slots and accounts to prefetch + for { + select { + case <-p.quitCh: + return + case cmd := <-p.cmdCh: + // Clear out any old requests + drain: + for { + select { + case req := <-p.requestCh: + if req.slots != nil { + skipped += int64(len(req.slots)) + } else { + skipped += int64(len(req.addresses)) + } + default: + break drain + } + } + if paused { + // Clear old data + p.storageTries = nil + p.accountTrie = nil + p.accountTrieRoot = common.Hash{} + // Resume again + storageTries = make(map[common.Hash]Trie) + accountTrieRoot = cmd.root + accountTrie, err = p.db.OpenTrie(accountTrieRoot) + if err != nil { + log.Error("Trie prefetcher failed opening trie", "root", accountTrieRoot, "err", err) + } + if accountTrieRoot == (common.Hash{}) { + log.Error("Trie prefetcher unpaused with bad root") + } + paused = false + } else { + // Update metrics at new block events + triePrefetchFetchMeter.Mark(fetched) + triePrefetchSkipMeter.Mark(skipped) + fetched, skipped = 0, 0 + // Make the tries accessible + p.accountTrie = accountTrie + p.storageTries = storageTries + p.accountTrieRoot = accountTrieRoot + if cmd.root != (common.Hash{}) { + log.Error("Trie prefetcher paused with non-empty root") + } + paused = true + } + p.deliveryCh <- struct{}{} + case req := <-p.requestCh: + if paused { + continue + } + if sRoot := req.storageRoot; sRoot != nil { + // Storage slots to fetch + var ( + storageTrie Trie + err error + ) + if storageTrie = storageTries[*sRoot]; storageTrie == nil { + if storageTrie, err = p.db.OpenTrie(*sRoot); err != nil { + log.Warn("trie prefetcher failed opening storage trie", "root", *sRoot, "err", err) + skipped += int64(len(req.slots)) + continue + } + storageTries[*sRoot] = storageTrie + } + for _, key := range req.slots { + storageTrie.TryGet(key[:]) + } + fetched += int64(len(req.slots)) + } else { // an account + for _, addr := range req.addresses { + accountTrie.TryGet(addr[:]) + } + fetched += int64(len(req.addresses)) + } + } + } +} + +// Close stops the prefetcher +func (p *TriePrefetcher) Close() { + if p.quitCh != nil { + close(p.quitCh) + p.quitCh = nil + } +} + +// Resume causes the prefetcher to clear out old data, and get ready to +// fetch data concerning the new root +func (p *TriePrefetcher) Resume(root common.Hash) { + p.paused = false + p.cmdCh <- &cmd{ + root: root, + } + // Wait for it + <-p.deliveryCh +} + +// Pause causes the prefetcher to pause prefetching, and make tries +// accessible to callers via GetTrie +func (p *TriePrefetcher) Pause() { + if p.paused { + return + } + p.paused = true + p.cmdCh <- &cmd{ + root: common.Hash{}, + } + // Wait for it + <-p.deliveryCh +} + +// PrefetchAddresses adds an address for prefetching +func (p *TriePrefetcher) PrefetchAddresses(addresses []common.Address) { + cmd := fetchRequest{ + addresses: addresses, + } + // We do an async send here, to not cause the caller to block + //p.requestCh <- cmd + select { + case p.requestCh <- cmd: + default: + triePrefetchDropMeter.Mark(int64(len(addresses))) + } +} + +// PrefetchStorage adds a storage root and a set of keys for prefetching +func (p *TriePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) { + cmd := fetchRequest{ + storageRoot: &root, + slots: slots, + } + // We do an async send here, to not cause the caller to block + //p.requestCh <- cmd + select { + case p.requestCh <- cmd: + default: + triePrefetchDropMeter.Mark(int64(len(slots))) + } +} + +// GetTrie returns the trie matching the root hash, or nil if the prefetcher +// doesn't have it. +func (p *TriePrefetcher) GetTrie(root common.Hash) Trie { + if root == p.accountTrieRoot { + return p.accountTrie + } + if storageTrie, ok := p.storageTries[root]; ok { + // Two accounts may well have the same storage root, but we cannot allow + // them both to make updates to the same trie instance. Therefore, + // we need to either delete the trie now, or deliver a copy of the trie. + delete(p.storageTries, root) + return storageTrie + } + trieDeliveryMissMeter.Mark(1) + return nil +} diff --git a/crypto/crypto.go b/crypto/crypto.go index a4a49136a..40969a289 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -60,10 +60,23 @@ type KeccakState interface { Read([]byte) (int, error) } +// NewKeccakState creates a new KeccakState +func NewKeccakState() KeccakState { + return sha3.NewLegacyKeccak256().(KeccakState) +} + +// HashData hashes the provided data using the KeccakState and returns a 32 byte hash +func HashData(kh KeccakState, data []byte) (h common.Hash) { + kh.Reset() + kh.Write(data) + kh.Read(h[:]) + return h +} + // Keccak256 calculates and returns the Keccak256 hash of the input data. func Keccak256(data ...[]byte) []byte { b := make([]byte, 32) - d := sha3.NewLegacyKeccak256().(KeccakState) + d := NewKeccakState() for _, b := range data { d.Write(b) } @@ -74,7 +87,7 @@ func Keccak256(data ...[]byte) []byte { // Keccak256Hash calculates and returns the Keccak256 hash of the input data, // converting it to an internal Hash data structure. func Keccak256Hash(data ...[]byte) (h common.Hash) { - d := sha3.NewLegacyKeccak256().(KeccakState) + d := NewKeccakState() for _, b := range data { d.Write(b) } diff --git a/crypto/crypto_test.go b/crypto/crypto_test.go index f71ae8232..f9b0d3e83 100644 --- a/crypto/crypto_test.go +++ b/crypto/crypto_test.go @@ -42,6 +42,13 @@ func TestKeccak256Hash(t *testing.T) { checkhash(t, "Sha3-256-array", func(in []byte) []byte { h := Keccak256Hash(in); return h[:] }, msg, exp) } +func TestKeccak256Hasher(t *testing.T) { + msg := []byte("abc") + exp, _ := hex.DecodeString("4e03657aea45a94fc7d47ba826c8d667c0d1e6e33a64a036ec44f58fa12d6c45") + hasher := NewKeccakState() + checkhash(t, "Sha3-256-array", func(in []byte) []byte { h := HashData(hasher, in); return h[:] }, msg, exp) +} + func TestToECDSAErrors(t *testing.T) { if _, err := HexToECDSA("0000000000000000000000000000000000000000000000000000000000000000"); err == nil { t.Fatal("HexToECDSA should've returned error") From 42f9f1f0738bde1126eaa6f6bed9c1ae03e304a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 8 Jan 2021 15:01:49 +0200 Subject: [PATCH 2/2] core/state: convert prefetcher to concurrent per-trie loader --- accounts/abi/bind/backends/simulated.go | 3 +- core/blockchain.go | 30 +- core/state/state_object.go | 22 +- core/state/state_test.go | 2 +- core/state/statedb.go | 127 ++++--- core/state/statedb_test.go | 6 +- core/state/trie_prefetcher.go | 455 ++++++++++++++---------- eth/api_tracer.go | 6 +- miner/worker.go | 16 +- 9 files changed, 385 insertions(+), 282 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 6e87e037f..8be364d08 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -125,10 +125,9 @@ func (b *SimulatedBackend) Rollback() { func (b *SimulatedBackend) rollback() { blocks, _ := core.GenerateChain(b.config, b.blockchain.CurrentBlock(), ethash.NewFaker(), b.database, 1, func(int, *core.BlockGen) {}) - stateDB, _ := b.blockchain.State() b.pendingBlock = blocks[0] - b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) + b.pendingState, _ = state.New(b.pendingBlock.Root(), b.blockchain.StateCache(), nil) } // stateByBlockNumber retrieves a state by a given blocknumber. diff --git a/core/blockchain.go b/core/blockchain.go index ccb99bded..d6668cdcd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -201,12 +201,11 @@ type BlockChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - engine consensus.Engine - validator Validator // Block and state validator interface - triePrefetcher *state.TriePrefetcher // Trie prefetcher interface - prefetcher Prefetcher - processor Processor // Block transaction processor interface - vmConfig vm.Config + engine consensus.Engine + validator Validator // Block and state validator interface + prefetcher Prefetcher + processor Processor // Block transaction processor interface + vmConfig vm.Config shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -250,15 +249,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) - tp := state.NewTriePrefetcher(bc.stateCache) - - bc.wg.Add(1) - go func() { - tp.Loop() - bc.wg.Done() - }() - bc.triePrefetcher = tp - bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error @@ -1001,9 +991,6 @@ func (bc *BlockChain) Stop() { bc.scope.Close() close(bc.quit) bc.StopInsert() - if bc.triePrefetcher != nil { - bc.triePrefetcher.Close() - } bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1870,16 +1857,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) - statedb.UsePrefetcher(bc.triePrefetcher) if err != nil { return it.index, err } + // Enable prefetching to pull in trie node paths while processing transactions + statedb.StartPrefetcher("chain") + defer statedb.StopPrefetcher() // stopped on write anyway, defer meant to catch early error returns + // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. var followupInterrupt uint32 if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) + go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) @@ -1933,7 +1924,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } - // Update the metrics touched during block commit accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them diff --git a/core/state/state_object.go b/core/state/state_object.go index 43c5074d9..f93f47d5f 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -162,7 +162,7 @@ func (s *stateObject) getTrie(db Database) Trie { if s.data.Root != emptyRoot && s.db.prefetcher != nil { // When the miner is creating the pending state, there is no // prefetcher - s.trie = s.db.prefetcher.GetTrie(s.data.Root) + s.trie = s.db.prefetcher.trie(s.data.Root) } if s.trie == nil { var err error @@ -309,14 +309,16 @@ func (s *stateObject) setState(key, value common.Hash) { // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. -func (s *stateObject) finalise() { - trieChanges := make([]common.Hash, 0, len(s.dirtyStorage)) +func (s *stateObject) finalise(prefetch bool) { + slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { s.pendingStorage[key] = value - trieChanges = append(trieChanges, key) + if value != s.originStorage[key] { + slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure + } } - if len(trieChanges) > 0 && s.db.prefetcher != nil && s.data.Root != emptyRoot { - s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges) + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { + s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch) } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -327,7 +329,7 @@ func (s *stateObject) finalise() { // It will return nil if the trie has not been loaded and no changes have been made func (s *stateObject) updateTrie(db Database) Trie { // Make sure all dirty slots are finalized into the pending storage area - s.finalise() + s.finalise(false) // Don't prefetch any more, pull directly if need be if len(s.pendingStorage) == 0 { return s.trie } @@ -340,6 +342,8 @@ func (s *stateObject) updateTrie(db Database) Trie { // Insert all the pending updates into the trie tr := s.getTrie(db) hasher := s.db.hasher + + usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { @@ -366,6 +370,10 @@ func (s *stateObject) updateTrie(db Database) Trie { } storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00 } + usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure + } + if s.db.prefetcher != nil { + s.db.prefetcher.used(s.data.Root, usedStorage) } if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) diff --git a/core/state/state_test.go b/core/state/state_test.go index 526d7f817..22e93d7a9 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -170,7 +170,7 @@ func TestSnapshot2(t *testing.T) { state.setStateObject(so0) root, _ := state.Commit(false) - state.Reset(root) + state, _ = New(root, state.db, state.snaps) // and one with deleted == true so1 := state.getStateObject(stateobjaddr1) diff --git a/core/state/statedb.go b/core/state/statedb.go index ce50962e8..49f457a99 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -63,7 +63,7 @@ func (n *proofList) Delete(key []byte) error { // * Accounts type StateDB struct { db Database - prefetcher *TriePrefetcher + prefetcher *triePrefetcher originalRoot common.Hash // The pre-state root, before any changes were made trie Trie hasher crypto.KeccakState @@ -149,10 +149,25 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return sdb, nil } -func (s *StateDB) UsePrefetcher(prefetcher *TriePrefetcher) { - if prefetcher != nil { - s.prefetcher = prefetcher - s.prefetcher.Resume(s.originalRoot) +// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the +// state trie concurrently while the state is mutated so that when we reach the +// commit phase, most of the needed data is already hot. +func (s *StateDB) StartPrefetcher(namespace string) { + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil + } + if s.snap != nil { + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + } +} + +// StopPrefetcher terminates a running prefetcher and reports any leftover stats +// from the gathered metrics. +func (s *StateDB) StopPrefetcher() { + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil } } @@ -167,37 +182,6 @@ func (s *StateDB) Error() error { return s.dbErr } -// Reset clears out all ephemeral state objects from the state db, but keeps -// the underlying state trie to avoid reloading data for the next operations. -func (s *StateDB) Reset(root common.Hash) error { - tr, err := s.db.OpenTrie(root) - if err != nil { - return err - } - s.trie = tr - s.stateObjects = make(map[common.Address]*stateObject) - s.stateObjectsPending = make(map[common.Address]struct{}) - s.stateObjectsDirty = make(map[common.Address]struct{}) - s.thash = common.Hash{} - s.bhash = common.Hash{} - s.txIndex = 0 - s.logs = make(map[common.Hash][]*types.Log) - s.logSize = 0 - s.preimages = make(map[common.Hash][]byte) - s.clearJournalAndRefund() - - if s.snaps != nil { - s.snapAccounts, s.snapDestructs, s.snapStorage = nil, nil, nil - if s.snap = s.snaps.Snapshot(root); s.snap != nil { - s.snapDestructs = make(map[common.Hash]struct{}) - s.snapAccounts = make(map[common.Hash][]byte) - s.snapStorage = make(map[common.Hash]map[common.Hash][]byte) - } - } - s.accessList = newAccessList() - return nil -} - func (s *StateDB) AddLog(log *types.Log) { s.journal.append(addLogChange{txhash: s.thash}) @@ -737,6 +721,13 @@ func (s *StateDB) Copy() *StateDB { // However, it doesn't cost us much to copy an empty list, so we do it anyway // to not blow up if we ever decide copy it in the middle of a transaction state.accessList = s.accessList.Copy() + + // If there's a prefetcher running, make an inactive copy of it that can + // only access data but does not actively preload (since the user will not + // know that they need to explicitly terminate an active copy). + if s.prefetcher != nil { + state.prefetcher = s.prefetcher.copy() + } return state } @@ -773,7 +764,7 @@ func (s *StateDB) GetRefund() uint64 { // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. func (s *StateDB) Finalise(deleteEmptyObjects bool) { - var addressesToPrefetch []common.Address + addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) for addr := range s.journal.dirties { obj, exist := s.stateObjects[addr] if !exist { @@ -798,21 +789,19 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { delete(s.snapStorage, obj.addrHash) // Clear out any previously updated storage data (may be recreated via a ressurrect) } } else { - obj.finalise() + obj.finalise(true) // Prefetch slots in the background } s.stateObjectsPending[addr] = struct{}{} s.stateObjectsDirty[addr] = struct{}{} + // At this point, also ship the address off to the precacher. The precacher // will start loading tries, and when the change is eventually committed, // the commit-phase will be a lot faster - if s.prefetcher != nil { - addressesToPrefetch = append(addressesToPrefetch, addr) - } + addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure } - if s.prefetcher != nil { - s.prefetcher.PrefetchAddresses(addressesToPrefetch) + if s.prefetcher != nil && len(addressesToPrefetch) > 0 { + s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch) } - // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() } @@ -824,29 +813,49 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) - // Now we're about to start to write changes to the trie. The trie is so - // far _untouched_. We can check with the prefetcher, if it can give us - // a trie which has the same root, but also has some content loaded into it. - // If so, use that one instead. + // If there was a trie prefetcher operating, it gets aborted and irrevocably + // modified after we start retrieving tries. Remove it from the statedb after + // this round of use. + // + // This is weird pre-byzantium since the first tx runs with a prefetcher and + // the remainder without, but pre-byzantium even the initial prefetcher is + // useless, so no sleep lost. + prefetcher := s.prefetcher if s.prefetcher != nil { - s.prefetcher.Pause() - // We only want to do this _once_, if someone calls IntermediateRoot again, - // we shouldn't fetch the trie again - if s.originalRoot != (common.Hash{}) { - if trie := s.prefetcher.GetTrie(s.originalRoot); trie != nil { - s.trie = trie - } - s.originalRoot = common.Hash{} + defer func() { + s.prefetcher.close() + s.prefetcher = nil + }() + } + // Although naively it makes sense to retrieve the account trie and then do + // the contract storage and account updates sequentially, that short circuits + // the account prefetcher. Instead, let's process all the storage updates + // first, giving the account prefeches just a few more milliseconds of time + // to pull useful data from disk. + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + obj.updateRoot(s.db) } } + // Now we're about to start to write changes to the trie. The trie is so far + // _untouched_. We can check with the prefetcher, if it can give us a trie + // which has the same root, but also has some content loaded into it. + if prefetcher != nil { + if trie := prefetcher.trie(s.originalRoot); trie != nil { + s.trie = trie + } + } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { - obj := s.stateObjects[addr] - if obj.deleted { + if obj := s.stateObjects[addr]; obj.deleted { s.deleteStateObject(obj) } else { - obj.updateRoot(s.db) s.updateStateObject(obj) } + usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure + } + if prefetcher != nil { + prefetcher.used(s.originalRoot, usedAddrs) } if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 70d01ff3d..220e28525 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -474,7 +474,7 @@ func TestTouchDelete(t *testing.T) { s := newStateTest() s.state.GetOrNewStateObject(common.Address{}) root, _ := s.state.Commit(false) - s.state.Reset(root) + s.state, _ = New(root, s.state.db, s.state.snaps) snapshot := s.state.Snapshot() s.state.AddBalance(common.Address{}, new(big.Int)) @@ -676,7 +676,7 @@ func TestDeleteCreateRevert(t *testing.T) { state.SetBalance(addr, big.NewInt(1)) root, _ := state.Commit(false) - state.Reset(root) + state, _ = New(root, state.db, state.snaps) // Simulate self-destructing in one transaction, then create-reverting in another state.Suicide(addr) @@ -688,7 +688,7 @@ func TestDeleteCreateRevert(t *testing.T) { // Commit the entire state and make sure we don't crash and have the correct state root, _ = state.Commit(true) - state.Reset(root) + state, _ = New(root, state.db, state.snaps) if state.getStateObject(addr) != nil { t.Fatalf("self-destructed contract came alive") diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 8a1aab325..ac5e95c5c 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -17,233 +17,318 @@ package state import ( + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) var ( - // trieDeliveryMeter counts how many times the prefetcher was unable to supply - // the statedb with a prefilled trie. This meter should be zero -- if it's not, that - // needs to be investigated - trieDeliveryMissMeter = metrics.NewRegisteredMeter("trie/prefetch/deliverymiss", nil) - - triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil) - triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil) - triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil) + // triePrefetchMetricsPrefix is the prefix under which to publis the metrics. + triePrefetchMetricsPrefix = "trie/prefetch/" ) -// TriePrefetcher is an active prefetcher, which receives accounts or storage -// items on two channels, and does trie-loading of the items. -// The goal is to get as much useful content into the caches as possible -type TriePrefetcher struct { - requestCh chan (fetchRequest) // Chan to receive requests for data to fetch - cmdCh chan (*cmd) // Chan to control activity, pause/new root - quitCh chan (struct{}) - deliveryCh chan (struct{}) - db Database +// triePrefetcher is an active prefetcher, which receives accounts or storage +// items and does trie-loading of them. The goal is to get as much useful content +// into the caches as possible. +// +// Note, the prefetcher's API is not thread safe. +type triePrefetcher struct { + db Database // Database to fetch trie nodes through + root common.Hash // Root hash of theaccount trie for metrics + fetches map[common.Hash]Trie // Partially or fully fetcher tries + fetchers map[common.Hash]*subfetcher // Subfetchers for each trie - paused bool - - storageTries map[common.Hash]Trie - accountTrie Trie - accountTrieRoot common.Hash + deliveryMissMeter metrics.Meter + accountLoadMeter metrics.Meter + accountDupMeter metrics.Meter + accountSkipMeter metrics.Meter + accountWasteMeter metrics.Meter + storageLoadMeter metrics.Meter + storageDupMeter metrics.Meter + storageSkipMeter metrics.Meter + storageWasteMeter metrics.Meter } -func NewTriePrefetcher(db Database) *TriePrefetcher { - return &TriePrefetcher{ - requestCh: make(chan fetchRequest, 200), - cmdCh: make(chan *cmd), - quitCh: make(chan struct{}), - deliveryCh: make(chan struct{}), - db: db, +// newTriePrefetcher +func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { + prefix := triePrefetchMetricsPrefix + namespace + p := &triePrefetcher{ + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + + deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), + accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), + accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), + accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), + accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), + storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), + storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), + storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), + storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } + return p } -type cmd struct { - root common.Hash -} +// close iterates over all the subfetchers, aborts any that were left spinning +// and reports the stats to the metrics subsystem. +func (p *triePrefetcher) close() { + for _, fetcher := range p.fetchers { + fetcher.abort() // safe to do multiple times -type fetchRequest struct { - slots []common.Hash - storageRoot *common.Hash - addresses []common.Address -} + if metrics.Enabled { + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) -func (p *TriePrefetcher) Loop() { - var ( - accountTrieRoot common.Hash - accountTrie Trie - storageTries map[common.Hash]Trie - - err error - // Some tracking of performance - skipped int64 - fetched int64 - - paused = true - ) - // The prefetcher loop has two distinct phases: - // 1: Paused: when in this state, the accumulated tries are accessible to outside - // callers. - // 2: Active prefetching, awaiting slots and accounts to prefetch - for { - select { - case <-p.quitCh: - return - case cmd := <-p.cmdCh: - // Clear out any old requests - drain: - for { - select { - case req := <-p.requestCh: - if req.slots != nil { - skipped += int64(len(req.slots)) - } else { - skipped += int64(len(req.addresses)) - } - default: - break drain + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) } - } - if paused { - // Clear old data - p.storageTries = nil - p.accountTrie = nil - p.accountTrieRoot = common.Hash{} - // Resume again - storageTries = make(map[common.Hash]Trie) - accountTrieRoot = cmd.root - accountTrie, err = p.db.OpenTrie(accountTrieRoot) - if err != nil { - log.Error("Trie prefetcher failed opening trie", "root", accountTrieRoot, "err", err) - } - if accountTrieRoot == (common.Hash{}) { - log.Error("Trie prefetcher unpaused with bad root") - } - paused = false + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) } else { - // Update metrics at new block events - triePrefetchFetchMeter.Mark(fetched) - triePrefetchSkipMeter.Mark(skipped) - fetched, skipped = 0, 0 - // Make the tries accessible - p.accountTrie = accountTrie - p.storageTries = storageTries - p.accountTrieRoot = accountTrieRoot - if cmd.root != (common.Hash{}) { - log.Error("Trie prefetcher paused with non-empty root") + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) } - paused = true - } - p.deliveryCh <- struct{}{} - case req := <-p.requestCh: - if paused { - continue - } - if sRoot := req.storageRoot; sRoot != nil { - // Storage slots to fetch - var ( - storageTrie Trie - err error - ) - if storageTrie = storageTries[*sRoot]; storageTrie == nil { - if storageTrie, err = p.db.OpenTrie(*sRoot); err != nil { - log.Warn("trie prefetcher failed opening storage trie", "root", *sRoot, "err", err) - skipped += int64(len(req.slots)) - continue - } - storageTries[*sRoot] = storageTrie - } - for _, key := range req.slots { - storageTrie.TryGet(key[:]) - } - fetched += int64(len(req.slots)) - } else { // an account - for _, addr := range req.addresses { - accountTrie.TryGet(addr[:]) - } - fetched += int64(len(req.addresses)) + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) } } } + // Clear out all fetchers (will crash on a second call, deliberate) + p.fetchers = nil } -// Close stops the prefetcher -func (p *TriePrefetcher) Close() { - if p.quitCh != nil { - close(p.quitCh) - p.quitCh = nil +// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data +// already loaded will be copied over, but no goroutines will be started. This +// is mostly used in the miner which creates a copy of it's actively mutated +// state to be sealed while it may further mutate the state. +func (p *triePrefetcher) copy() *triePrefetcher { + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map + + deliveryMissMeter: p.deliveryMissMeter, + accountLoadMeter: p.accountLoadMeter, + accountDupMeter: p.accountDupMeter, + accountSkipMeter: p.accountSkipMeter, + accountWasteMeter: p.accountWasteMeter, + storageLoadMeter: p.storageLoadMeter, + storageDupMeter: p.storageDupMeter, + storageSkipMeter: p.storageSkipMeter, + storageWasteMeter: p.storageWasteMeter, } -} - -// Resume causes the prefetcher to clear out old data, and get ready to -// fetch data concerning the new root -func (p *TriePrefetcher) Resume(root common.Hash) { - p.paused = false - p.cmdCh <- &cmd{ - root: root, + // If the prefetcher is already a copy, duplicate the data + if p.fetches != nil { + for root, fetch := range p.fetches { + copy.fetches[root] = p.db.CopyTrie(fetch) + } + return copy } - // Wait for it - <-p.deliveryCh + // Otherwise we're copying an active fetcher, retrieve the current states + for root, fetcher := range p.fetchers { + copy.fetches[root] = fetcher.peek() + } + return copy } -// Pause causes the prefetcher to pause prefetching, and make tries -// accessible to callers via GetTrie -func (p *TriePrefetcher) Pause() { - if p.paused { +// prefetch schedules a batch of trie items to prefetch. +func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) { + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { return } - p.paused = true - p.cmdCh <- &cmd{ - root: common.Hash{}, + // Active fetcher, schedule the retrievals + fetcher := p.fetchers[root] + if fetcher == nil { + fetcher = newSubfetcher(p.db, root) + p.fetchers[root] = fetcher } - // Wait for it - <-p.deliveryCh + fetcher.schedule(keys) } -// PrefetchAddresses adds an address for prefetching -func (p *TriePrefetcher) PrefetchAddresses(addresses []common.Address) { - cmd := fetchRequest{ - addresses: addresses, +// trie returns the trie matching the root hash, or nil if the prefetcher doesn't +// have it. +func (p *triePrefetcher) trie(root common.Hash) Trie { + // If the prefetcher is inactive, return from existing deep copies + if p.fetches != nil { + trie := p.fetches[root] + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return p.db.CopyTrie(trie) } - // We do an async send here, to not cause the caller to block - //p.requestCh <- cmd + // Otherwise the prefetcher is active, bail if no trie was prefetched for this root + fetcher := p.fetchers[root] + if fetcher == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + fetcher.abort() // safe to do multiple times + + trie := fetcher.peek() + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return trie +} + +// used marks a batch of state items used to allow creating statistics as to +// how useful or wasteful the prefetcher is. +func (p *triePrefetcher) used(root common.Hash, used [][]byte) { + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used + } +} + +// subfetcher is a trie fetcher goroutine responsible for pulling entries for a +// single trie. It is spawned when a new root is encountered and lives until the +// main prefetcher is paused and either all requested items are processed or if +// the trie being worked on is retrieved from the prefetcher. +type subfetcher struct { + db Database // Database to load trie nodes through + root common.Hash // Root hash of the trie to prefetch + trie Trie // Trie being populated with nodes + + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue + + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing + term chan struct{} // Channel to signal iterruption + copy chan chan Trie // Channel to request a copy of the current trie + + seen map[string]struct{} // Tracks the entries already loaded + dups int // Number of duplicate preload tasks + used [][]byte // Tracks the entries used in the end +} + +// newSubfetcher creates a goroutine to prefetch state items belonging to a +// particular root hash. +func newSubfetcher(db Database, root common.Hash) *subfetcher { + sf := &subfetcher{ + db: db, + root: root, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan Trie), + seen: make(map[string]struct{}), + } + go sf.loop() + return sf +} + +// schedule adds a batch of trie keys to the queue to prefetch. +func (sf *subfetcher) schedule(keys [][]byte) { + // Append the tasks to the current queue + sf.lock.Lock() + sf.tasks = append(sf.tasks, keys...) + sf.lock.Unlock() + + // Notify the prefetcher, it's fine if it's already terminated select { - case p.requestCh <- cmd: + case sf.wake <- struct{}{}: default: - triePrefetchDropMeter.Mark(int64(len(addresses))) } } -// PrefetchStorage adds a storage root and a set of keys for prefetching -func (p *TriePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) { - cmd := fetchRequest{ - storageRoot: &root, - slots: slots, - } - // We do an async send here, to not cause the caller to block - //p.requestCh <- cmd +// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it +// is currently. +func (sf *subfetcher) peek() Trie { + ch := make(chan Trie) select { - case p.requestCh <- cmd: - default: - triePrefetchDropMeter.Mark(int64(len(slots))) + case sf.copy <- ch: + // Subfetcher still alive, return copy from it + return <-ch + + case <-sf.term: + // Subfetcher already terminated, return a copy directly + if sf.trie == nil { + return nil + } + return sf.db.CopyTrie(sf.trie) } } -// GetTrie returns the trie matching the root hash, or nil if the prefetcher -// doesn't have it. -func (p *TriePrefetcher) GetTrie(root common.Hash) Trie { - if root == p.accountTrieRoot { - return p.accountTrie +// abort interrupts the subfetcher immediately. It is safe to call abort multiple +// times but it is not thread safe. +func (sf *subfetcher) abort() { + select { + case <-sf.stop: + default: + close(sf.stop) + } + <-sf.term +} + +// loop waits for new tasks to be scheduled and keeps loading them until it runs +// out of tasks or its underlying trie is retrieved for committing. +func (sf *subfetcher) loop() { + // No matter how the loop stops, signal anyone waiting that it's terminated + defer close(sf.term) + + // Start by opening the trie and stop processing if it fails + trie, err := sf.db.OpenTrie(sf.root) + if err != nil { + log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) + return + } + sf.trie = trie + + // Trie opened successfully, keep prefetching items + for { + select { + case <-sf.wake: + // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + sf.lock.Lock() + tasks := sf.tasks + sf.tasks = nil + sf.lock.Unlock() + + // Prefetch any tasks until the loop is interrupted + for i, task := range tasks { + select { + case <-sf.stop: + // If termination is requested, add any leftover back and return + sf.lock.Lock() + sf.tasks = append(sf.tasks, tasks[i:]...) + sf.lock.Unlock() + return + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + default: + // No termination request yet, prefetch the next entry + taskid := string(task) + if _, ok := sf.seen[taskid]; ok { + sf.dups++ + } else { + sf.trie.TryGet(task) + sf.seen[taskid] = struct{}{} + } + } + } + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + case <-sf.stop: + // Termination is requested, abort and leave remaining tasks + return + } } - if storageTrie, ok := p.storageTries[root]; ok { - // Two accounts may well have the same storage root, but we cannot allow - // them both to make updates to the same trie instance. Therefore, - // we need to either delete the trie now, or deliver a copy of the trie. - delete(p.storageTries, root) - return storageTrie - } - trieDeliveryMissMeter.Mark(1) - return nil } diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 8e71945ee..5dffb2a46 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -299,7 +299,8 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl failed = err break } - if err := statedb.Reset(root); err != nil { + statedb, err = state.New(root, database, nil) + if err != nil { failed = err break } @@ -699,7 +700,8 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (* if err != nil { return nil, err } - if err := statedb.Reset(root); err != nil { + statedb, err = state.New(root, database, nil) + if err != nil { return nil, fmt.Errorf("state reset after block %d failed: %v", block.NumberU64(), err) } database.TrieDB().Reference(root, common.Hash{}) diff --git a/miner/worker.go b/miner/worker.go index 2c5032c65..82d08d4c7 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -303,6 +303,9 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { + if w.current != nil && w.current.state != nil { + w.current.state.StopPrefetcher() + } atomic.StoreInt32(&w.running, 0) close(w.exitCh) } @@ -642,10 +645,14 @@ func (w *worker) resultLoop() { // makeCurrent creates a new environment for the current cycle. func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { + // Retrieve the parent state to execute on top and start a prefetcher for + // the miner to speed block sealing up a bit state, err := w.chain.StateAt(parent.Root()) if err != nil { return err } + state.StartPrefetcher("miner") + env := &environment{ signer: types.NewEIP155Signer(w.chainConfig.ChainID), state: state, @@ -654,7 +661,6 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { uncles: mapset.NewSet(), header: header, } - // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { for _, uncle := range ancestor.Uncles() { @@ -663,9 +669,14 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { env.family.Add(ancestor.Hash()) env.ancestors.Add(ancestor.Hash()) } - // Keep track of transactions which return errors so they can be removed env.tcount = 0 + + // Swap out the old work with the new one, terminating any leftover prefetcher + // processes in the mean time and starting a new one. + if w.current != nil && w.current.state != nil { + w.current.state.StopPrefetcher() + } w.current = env return nil } @@ -719,7 +730,6 @@ func (w *worker) updateSnapshot() { w.current.receipts, new(trie.Trie), ) - w.snapshotState = w.current.state.Copy() }