diff --git a/state_object.go b/state_object.go index d34170f..e5af79f 100644 --- a/state_object.go +++ b/state_object.go @@ -136,20 +136,11 @@ func (s *stateObject) touch() { func (s *stateObject) getTrie(db Database) state.Trie { if s.trie == nil { - // Try fetching from prefetcher first - // We don't prefetch empty tries - if s.data.Root != emptyRoot && s.db.prefetcher != nil { - // When the miner is creating the pending state, there is no - // prefetcher - s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root) - } - if s.trie == nil { - var err error - s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) - if err != nil { - s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) - s.setError(fmt.Errorf("can't create storage trie: %v", err)) - } + var err error + s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) + if err != nil { + s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) + s.setError(fmt.Errorf("can't create storage trie: %v", err)) } } return s.trie @@ -244,7 +235,7 @@ func (s *stateObject) setState(key, value common.Hash) { // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. -func (s *stateObject) finalise(prefetch bool) { +func (s *stateObject) finalise() { slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { s.pendingStorage[key] = value @@ -252,9 +243,6 @@ func (s *stateObject) finalise(prefetch bool) { slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure } } - if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { - s.db.prefetcher.prefetch(s.addrHash, s.data.Root, slotsToPrefetch) - } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) } @@ -264,7 +252,7 @@ func (s *stateObject) finalise(prefetch bool) { // It will return nil if the trie has not been loaded and no changes have been made func (s *stateObject) updateTrie(db Database) state.Trie { // Make sure all dirty slots are finalized into the pending storage area - s.finalise(false) // Don't prefetch anymore, pull directly if need be + s.finalise() if len(s.pendingStorage) == 0 { return s.trie } @@ -295,9 +283,6 @@ func (s *stateObject) updateTrie(db Database) state.Trie { } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure } - if s.db.prefetcher != nil { - s.db.prefetcher.used(s.addrHash, s.data.Root, usedStorage) - } if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } diff --git a/statedb.go b/statedb.go index 13c95cd..6e73923 100644 --- a/statedb.go +++ b/statedb.go @@ -56,7 +56,6 @@ var ( // * Accounts type StateDB struct { db Database - prefetcher *triePrefetcher trie state.Trie hasher crypto.KeccakState diff --git a/trie_prefetcher.go b/trie_prefetcher.go deleted file mode 100644 index 52700ae..0000000 --- a/trie_prefetcher.go +++ /dev/null @@ -1,339 +0,0 @@ -package ipld_eth_statedb - -import ( - "sync" - - "github.com/ethereum/go-ethereum/core/state" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" -) - -var ( - // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. - triePrefetchMetricsPrefix = "trie/prefetch/" -) - -// triePrefetcher is an active prefetcher, which receives accounts or storage -// items and does trie-loading of them. The goal is to get as much useful content -// into the caches as possible. -// -// Note, the prefetcher's API is not thread safe. -type triePrefetcher struct { - db state.Database // Database to fetch trie nodes through - root common.Hash // Root hash of the account trie for metrics - fetches map[string]state.Trie // Partially or fully fetcher tries - fetchers map[string]*subfetcher // Subfetchers for each trie - - deliveryMissMeter metrics.Meter - accountLoadMeter metrics.Meter - accountDupMeter metrics.Meter - accountSkipMeter metrics.Meter - accountWasteMeter metrics.Meter - storageLoadMeter metrics.Meter - storageDupMeter metrics.Meter - storageSkipMeter metrics.Meter - storageWasteMeter metrics.Meter -} - -func newTriePrefetcher(db state.Database, root common.Hash, namespace string) *triePrefetcher { - prefix := triePrefetchMetricsPrefix + namespace - p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map - - deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), - accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), - accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), - accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), - accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), - storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), - storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), - storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), - storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), - } - return p -} - -// close iterates over all the subfetchers, aborts any that were left spinning -// and reports the stats to the metrics subsystem. -func (p *triePrefetcher) close() { - for _, fetcher := range p.fetchers { - fetcher.abort() // safe to do multiple times - - if metrics.Enabled { - if fetcher.root == p.root { - p.accountLoadMeter.Mark(int64(len(fetcher.seen))) - p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.accountWasteMeter.Mark(int64(len(fetcher.seen))) - } else { - p.storageLoadMeter.Mark(int64(len(fetcher.seen))) - p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.storageWasteMeter.Mark(int64(len(fetcher.seen))) - } - } - } - // Clear out all fetchers (will crash on a second call, deliberate) - p.fetchers = nil -} - -// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data -// already loaded will be copied over, but no goroutines will be started. This -// is mostly used in the miner which creates a copy of it's actively mutated -// state to be sealed while it may further mutate the state. -func (p *triePrefetcher) copy() *triePrefetcher { - copy := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[string]state.Trie), // Active prefetchers use the fetches map - - deliveryMissMeter: p.deliveryMissMeter, - accountLoadMeter: p.accountLoadMeter, - accountDupMeter: p.accountDupMeter, - accountSkipMeter: p.accountSkipMeter, - accountWasteMeter: p.accountWasteMeter, - storageLoadMeter: p.storageLoadMeter, - storageDupMeter: p.storageDupMeter, - storageSkipMeter: p.storageSkipMeter, - storageWasteMeter: p.storageWasteMeter, - } - // If the prefetcher is already a copy, duplicate the data - if p.fetches != nil { - for root, fetch := range p.fetches { - copy.fetches[root] = p.db.CopyTrie(fetch) - } - return copy - } - // Otherwise we're copying an active fetcher, retrieve the current states - for id, fetcher := range p.fetchers { - copy.fetches[id] = fetcher.peek() - } - return copy -} - -// prefetch schedules a batch of trie items to prefetch. -func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]byte) { - // If the prefetcher is an inactive one, bail out - if p.fetches != nil { - return - } - // Active fetcher, schedule the retrievals - id := p.trieID(owner, root) - fetcher := p.fetchers[id] - if fetcher == nil { - fetcher = newSubfetcher(p.db, owner, root) - p.fetchers[id] = fetcher - } - fetcher.schedule(keys) -} - -// trie returns the trie matching the root hash, or nil if the prefetcher doesn't -// have it. -func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) state.Trie { - // If the prefetcher is inactive, return from existing deep copies - id := p.trieID(owner, root) - if p.fetches != nil { - trie := p.fetches[id] - if trie == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - return p.db.CopyTrie(trie) - } - // Otherwise the prefetcher is active, bail if no trie was prefetched for this root - fetcher := p.fetchers[id] - if fetcher == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - fetcher.abort() // safe to do multiple times - - trie := fetcher.peek() - if trie == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - return trie -} - -// used marks a batch of state items used to allow creating statistics as to -// how useful or wasteful the prefetcher is. -func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { - if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { - fetcher.used = used - } -} - -// trieID returns an unique trie identifier consists the trie owner and root hash. -func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string { - return string(append(owner.Bytes(), root.Bytes()...)) -} - -// subfetcher is a trie fetcher goroutine responsible for pulling entries for a -// single trie. It is spawned when a new root is encountered and lives until the -// main prefetcher is paused and either all requested items are processed or if -// the trie being worked on is retrieved from the prefetcher. -type subfetcher struct { - db state.Database // Database to load trie nodes through - owner common.Hash // Owner of the trie, usually account hash - root common.Hash // Root hash of the trie to prefetch - trie state.Trie // Trie being populated with nodes - - tasks [][]byte // Items queued up for retrieval - lock sync.Mutex // Lock protecting the task queue - - wake chan struct{} // Wake channel if a new task is scheduled - stop chan struct{} // Channel to interrupt processing - term chan struct{} // Channel to signal interruption - copy chan chan state.Trie // Channel to request a copy of the current trie - - seen map[string]struct{} // Tracks the entries already loaded - dups int // Number of duplicate preload tasks - used [][]byte // Tracks the entries used in the end -} - -// newSubfetcher creates a goroutine to prefetch state items belonging to a -// particular root hash. -func newSubfetcher(db state.Database, owner common.Hash, root common.Hash) *subfetcher { - sf := &subfetcher{ - db: db, - owner: owner, - root: root, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - term: make(chan struct{}), - copy: make(chan chan state.Trie), - seen: make(map[string]struct{}), - } - go sf.loop() - return sf -} - -// schedule adds a batch of trie keys to the queue to prefetch. -func (sf *subfetcher) schedule(keys [][]byte) { - // Append the tasks to the current queue - sf.lock.Lock() - sf.tasks = append(sf.tasks, keys...) - sf.lock.Unlock() - - // Notify the prefetcher, it's fine if it's already terminated - select { - case sf.wake <- struct{}{}: - default: - } -} - -// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it -// is currently. -func (sf *subfetcher) peek() state.Trie { - ch := make(chan state.Trie) - select { - case sf.copy <- ch: - // Subfetcher still alive, return copy from it - return <-ch - - case <-sf.term: - // Subfetcher already terminated, return a copy directly - if sf.trie == nil { - return nil - } - return sf.db.CopyTrie(sf.trie) - } -} - -// abort interrupts the subfetcher immediately. It is safe to call abort multiple -// times but it is not thread safe. -func (sf *subfetcher) abort() { - select { - case <-sf.stop: - default: - close(sf.stop) - } - <-sf.term -} - -// loop waits for new tasks to be scheduled and keeps loading them until it runs -// out of tasks or its underlying trie is retrieved for committing. -func (sf *subfetcher) loop() { - // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) - - // Start by opening the trie and stop processing if it fails - if sf.owner == (common.Hash{}) { - trie, err := sf.db.OpenTrie(sf.root) - if err != nil { - log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return - } - sf.trie = trie - } else { - trie, err := sf.db.OpenStorageTrie(sf.owner, sf.root) - if err != nil { - log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return - } - sf.trie = trie - } - // Trie opened successfully, keep prefetching items - for { - select { - case <-sf.wake: - // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock - sf.lock.Lock() - tasks := sf.tasks - sf.tasks = nil - sf.lock.Unlock() - - // Prefetch any tasks until the loop is interrupted - for i, task := range tasks { - select { - case <-sf.stop: - // If termination is requested, add any leftover back and return - sf.lock.Lock() - sf.tasks = append(sf.tasks, tasks[i:]...) - sf.lock.Unlock() - return - - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) - - default: - // No termination request yet, prefetch the next entry - if _, ok := sf.seen[string(task)]; ok { - sf.dups++ - } else { - if len(task) == len(common.Address{}) { - sf.trie.TryGetAccount(task) - } else { - sf.trie.TryGet(task) - } - sf.seen[string(task)] = struct{}{} - } - } - } - - case ch := <-sf.copy: - // Somebody wants a copy of the current trie, grant them - ch <- sf.db.CopyTrie(sf.trie) - - case <-sf.stop: - // Termination is requested, abort and leave remaining tasks - return - } - } -}