remove prefetcher (remmoving trie access)
This commit is contained in:
parent
23baf50803
commit
6361f744ac
@ -135,14 +135,6 @@ func (s *stateObject) touch() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateObject) getTrie(db Database) state.Trie {
|
func (s *stateObject) getTrie(db Database) state.Trie {
|
||||||
if s.trie == nil {
|
|
||||||
// Try fetching from prefetcher first
|
|
||||||
// We don't prefetch empty tries
|
|
||||||
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
|
|
||||||
// When the miner is creating the pending state, there is no
|
|
||||||
// prefetcher
|
|
||||||
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
|
|
||||||
}
|
|
||||||
if s.trie == nil {
|
if s.trie == nil {
|
||||||
var err error
|
var err error
|
||||||
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
|
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
|
||||||
@ -151,7 +143,6 @@ func (s *stateObject) getTrie(db Database) state.Trie {
|
|||||||
s.setError(fmt.Errorf("can't create storage trie: %v", err))
|
s.setError(fmt.Errorf("can't create storage trie: %v", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return s.trie
|
return s.trie
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,7 +235,7 @@ func (s *stateObject) setState(key, value common.Hash) {
|
|||||||
|
|
||||||
// finalise moves all dirty storage slots into the pending area to be hashed or
|
// finalise moves all dirty storage slots into the pending area to be hashed or
|
||||||
// committed later. It is invoked at the end of every transaction.
|
// committed later. It is invoked at the end of every transaction.
|
||||||
func (s *stateObject) finalise(prefetch bool) {
|
func (s *stateObject) finalise() {
|
||||||
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
|
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
|
||||||
for key, value := range s.dirtyStorage {
|
for key, value := range s.dirtyStorage {
|
||||||
s.pendingStorage[key] = value
|
s.pendingStorage[key] = value
|
||||||
@ -252,9 +243,6 @@ func (s *stateObject) finalise(prefetch bool) {
|
|||||||
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
|
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
|
|
||||||
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, slotsToPrefetch)
|
|
||||||
}
|
|
||||||
if len(s.dirtyStorage) > 0 {
|
if len(s.dirtyStorage) > 0 {
|
||||||
s.dirtyStorage = make(Storage)
|
s.dirtyStorage = make(Storage)
|
||||||
}
|
}
|
||||||
@ -264,7 +252,7 @@ func (s *stateObject) finalise(prefetch bool) {
|
|||||||
// It will return nil if the trie has not been loaded and no changes have been made
|
// It will return nil if the trie has not been loaded and no changes have been made
|
||||||
func (s *stateObject) updateTrie(db Database) state.Trie {
|
func (s *stateObject) updateTrie(db Database) state.Trie {
|
||||||
// Make sure all dirty slots are finalized into the pending storage area
|
// Make sure all dirty slots are finalized into the pending storage area
|
||||||
s.finalise(false) // Don't prefetch anymore, pull directly if need be
|
s.finalise()
|
||||||
if len(s.pendingStorage) == 0 {
|
if len(s.pendingStorage) == 0 {
|
||||||
return s.trie
|
return s.trie
|
||||||
}
|
}
|
||||||
@ -295,9 +283,6 @@ func (s *stateObject) updateTrie(db Database) state.Trie {
|
|||||||
}
|
}
|
||||||
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
|
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
|
||||||
}
|
}
|
||||||
if s.db.prefetcher != nil {
|
|
||||||
s.db.prefetcher.used(s.addrHash, s.data.Root, usedStorage)
|
|
||||||
}
|
|
||||||
if len(s.pendingStorage) > 0 {
|
if len(s.pendingStorage) > 0 {
|
||||||
s.pendingStorage = make(Storage)
|
s.pendingStorage = make(Storage)
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,6 @@ var (
|
|||||||
// * Accounts
|
// * Accounts
|
||||||
type StateDB struct {
|
type StateDB struct {
|
||||||
db Database
|
db Database
|
||||||
prefetcher *triePrefetcher
|
|
||||||
trie state.Trie
|
trie state.Trie
|
||||||
hasher crypto.KeccakState
|
hasher crypto.KeccakState
|
||||||
|
|
||||||
|
@ -1,339 +0,0 @@
|
|||||||
package ipld_eth_statedb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
|
|
||||||
triePrefetchMetricsPrefix = "trie/prefetch/"
|
|
||||||
)
|
|
||||||
|
|
||||||
// triePrefetcher is an active prefetcher, which receives accounts or storage
|
|
||||||
// items and does trie-loading of them. The goal is to get as much useful content
|
|
||||||
// into the caches as possible.
|
|
||||||
//
|
|
||||||
// Note, the prefetcher's API is not thread safe.
|
|
||||||
type triePrefetcher struct {
|
|
||||||
db state.Database // Database to fetch trie nodes through
|
|
||||||
root common.Hash // Root hash of the account trie for metrics
|
|
||||||
fetches map[string]state.Trie // Partially or fully fetcher tries
|
|
||||||
fetchers map[string]*subfetcher // Subfetchers for each trie
|
|
||||||
|
|
||||||
deliveryMissMeter metrics.Meter
|
|
||||||
accountLoadMeter metrics.Meter
|
|
||||||
accountDupMeter metrics.Meter
|
|
||||||
accountSkipMeter metrics.Meter
|
|
||||||
accountWasteMeter metrics.Meter
|
|
||||||
storageLoadMeter metrics.Meter
|
|
||||||
storageDupMeter metrics.Meter
|
|
||||||
storageSkipMeter metrics.Meter
|
|
||||||
storageWasteMeter metrics.Meter
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTriePrefetcher(db state.Database, root common.Hash, namespace string) *triePrefetcher {
|
|
||||||
prefix := triePrefetchMetricsPrefix + namespace
|
|
||||||
p := &triePrefetcher{
|
|
||||||
db: db,
|
|
||||||
root: root,
|
|
||||||
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
|
|
||||||
|
|
||||||
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
|
|
||||||
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
|
|
||||||
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
|
|
||||||
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
|
|
||||||
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
|
|
||||||
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
|
|
||||||
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
|
|
||||||
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
|
|
||||||
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
|
|
||||||
}
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
// close iterates over all the subfetchers, aborts any that were left spinning
|
|
||||||
// and reports the stats to the metrics subsystem.
|
|
||||||
func (p *triePrefetcher) close() {
|
|
||||||
for _, fetcher := range p.fetchers {
|
|
||||||
fetcher.abort() // safe to do multiple times
|
|
||||||
|
|
||||||
if metrics.Enabled {
|
|
||||||
if fetcher.root == p.root {
|
|
||||||
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
|
|
||||||
p.accountDupMeter.Mark(int64(fetcher.dups))
|
|
||||||
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
|
|
||||||
|
|
||||||
for _, key := range fetcher.used {
|
|
||||||
delete(fetcher.seen, string(key))
|
|
||||||
}
|
|
||||||
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
|
|
||||||
} else {
|
|
||||||
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
|
|
||||||
p.storageDupMeter.Mark(int64(fetcher.dups))
|
|
||||||
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
|
|
||||||
|
|
||||||
for _, key := range fetcher.used {
|
|
||||||
delete(fetcher.seen, string(key))
|
|
||||||
}
|
|
||||||
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Clear out all fetchers (will crash on a second call, deliberate)
|
|
||||||
p.fetchers = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
|
|
||||||
// already loaded will be copied over, but no goroutines will be started. This
|
|
||||||
// is mostly used in the miner which creates a copy of it's actively mutated
|
|
||||||
// state to be sealed while it may further mutate the state.
|
|
||||||
func (p *triePrefetcher) copy() *triePrefetcher {
|
|
||||||
copy := &triePrefetcher{
|
|
||||||
db: p.db,
|
|
||||||
root: p.root,
|
|
||||||
fetches: make(map[string]state.Trie), // Active prefetchers use the fetches map
|
|
||||||
|
|
||||||
deliveryMissMeter: p.deliveryMissMeter,
|
|
||||||
accountLoadMeter: p.accountLoadMeter,
|
|
||||||
accountDupMeter: p.accountDupMeter,
|
|
||||||
accountSkipMeter: p.accountSkipMeter,
|
|
||||||
accountWasteMeter: p.accountWasteMeter,
|
|
||||||
storageLoadMeter: p.storageLoadMeter,
|
|
||||||
storageDupMeter: p.storageDupMeter,
|
|
||||||
storageSkipMeter: p.storageSkipMeter,
|
|
||||||
storageWasteMeter: p.storageWasteMeter,
|
|
||||||
}
|
|
||||||
// If the prefetcher is already a copy, duplicate the data
|
|
||||||
if p.fetches != nil {
|
|
||||||
for root, fetch := range p.fetches {
|
|
||||||
copy.fetches[root] = p.db.CopyTrie(fetch)
|
|
||||||
}
|
|
||||||
return copy
|
|
||||||
}
|
|
||||||
// Otherwise we're copying an active fetcher, retrieve the current states
|
|
||||||
for id, fetcher := range p.fetchers {
|
|
||||||
copy.fetches[id] = fetcher.peek()
|
|
||||||
}
|
|
||||||
return copy
|
|
||||||
}
|
|
||||||
|
|
||||||
// prefetch schedules a batch of trie items to prefetch.
|
|
||||||
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]byte) {
|
|
||||||
// If the prefetcher is an inactive one, bail out
|
|
||||||
if p.fetches != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Active fetcher, schedule the retrievals
|
|
||||||
id := p.trieID(owner, root)
|
|
||||||
fetcher := p.fetchers[id]
|
|
||||||
if fetcher == nil {
|
|
||||||
fetcher = newSubfetcher(p.db, owner, root)
|
|
||||||
p.fetchers[id] = fetcher
|
|
||||||
}
|
|
||||||
fetcher.schedule(keys)
|
|
||||||
}
|
|
||||||
|
|
||||||
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
|
|
||||||
// have it.
|
|
||||||
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) state.Trie {
|
|
||||||
// If the prefetcher is inactive, return from existing deep copies
|
|
||||||
id := p.trieID(owner, root)
|
|
||||||
if p.fetches != nil {
|
|
||||||
trie := p.fetches[id]
|
|
||||||
if trie == nil {
|
|
||||||
p.deliveryMissMeter.Mark(1)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return p.db.CopyTrie(trie)
|
|
||||||
}
|
|
||||||
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
|
|
||||||
fetcher := p.fetchers[id]
|
|
||||||
if fetcher == nil {
|
|
||||||
p.deliveryMissMeter.Mark(1)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Interrupt the prefetcher if it's by any chance still running and return
|
|
||||||
// a copy of any pre-loaded trie.
|
|
||||||
fetcher.abort() // safe to do multiple times
|
|
||||||
|
|
||||||
trie := fetcher.peek()
|
|
||||||
if trie == nil {
|
|
||||||
p.deliveryMissMeter.Mark(1)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return trie
|
|
||||||
}
|
|
||||||
|
|
||||||
// used marks a batch of state items used to allow creating statistics as to
|
|
||||||
// how useful or wasteful the prefetcher is.
|
|
||||||
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
|
|
||||||
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
|
|
||||||
fetcher.used = used
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// trieID returns an unique trie identifier consists the trie owner and root hash.
|
|
||||||
func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string {
|
|
||||||
return string(append(owner.Bytes(), root.Bytes()...))
|
|
||||||
}
|
|
||||||
|
|
||||||
// subfetcher is a trie fetcher goroutine responsible for pulling entries for a
|
|
||||||
// single trie. It is spawned when a new root is encountered and lives until the
|
|
||||||
// main prefetcher is paused and either all requested items are processed or if
|
|
||||||
// the trie being worked on is retrieved from the prefetcher.
|
|
||||||
type subfetcher struct {
|
|
||||||
db state.Database // Database to load trie nodes through
|
|
||||||
owner common.Hash // Owner of the trie, usually account hash
|
|
||||||
root common.Hash // Root hash of the trie to prefetch
|
|
||||||
trie state.Trie // Trie being populated with nodes
|
|
||||||
|
|
||||||
tasks [][]byte // Items queued up for retrieval
|
|
||||||
lock sync.Mutex // Lock protecting the task queue
|
|
||||||
|
|
||||||
wake chan struct{} // Wake channel if a new task is scheduled
|
|
||||||
stop chan struct{} // Channel to interrupt processing
|
|
||||||
term chan struct{} // Channel to signal interruption
|
|
||||||
copy chan chan state.Trie // Channel to request a copy of the current trie
|
|
||||||
|
|
||||||
seen map[string]struct{} // Tracks the entries already loaded
|
|
||||||
dups int // Number of duplicate preload tasks
|
|
||||||
used [][]byte // Tracks the entries used in the end
|
|
||||||
}
|
|
||||||
|
|
||||||
// newSubfetcher creates a goroutine to prefetch state items belonging to a
|
|
||||||
// particular root hash.
|
|
||||||
func newSubfetcher(db state.Database, owner common.Hash, root common.Hash) *subfetcher {
|
|
||||||
sf := &subfetcher{
|
|
||||||
db: db,
|
|
||||||
owner: owner,
|
|
||||||
root: root,
|
|
||||||
wake: make(chan struct{}, 1),
|
|
||||||
stop: make(chan struct{}),
|
|
||||||
term: make(chan struct{}),
|
|
||||||
copy: make(chan chan state.Trie),
|
|
||||||
seen: make(map[string]struct{}),
|
|
||||||
}
|
|
||||||
go sf.loop()
|
|
||||||
return sf
|
|
||||||
}
|
|
||||||
|
|
||||||
// schedule adds a batch of trie keys to the queue to prefetch.
|
|
||||||
func (sf *subfetcher) schedule(keys [][]byte) {
|
|
||||||
// Append the tasks to the current queue
|
|
||||||
sf.lock.Lock()
|
|
||||||
sf.tasks = append(sf.tasks, keys...)
|
|
||||||
sf.lock.Unlock()
|
|
||||||
|
|
||||||
// Notify the prefetcher, it's fine if it's already terminated
|
|
||||||
select {
|
|
||||||
case sf.wake <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
|
|
||||||
// is currently.
|
|
||||||
func (sf *subfetcher) peek() state.Trie {
|
|
||||||
ch := make(chan state.Trie)
|
|
||||||
select {
|
|
||||||
case sf.copy <- ch:
|
|
||||||
// Subfetcher still alive, return copy from it
|
|
||||||
return <-ch
|
|
||||||
|
|
||||||
case <-sf.term:
|
|
||||||
// Subfetcher already terminated, return a copy directly
|
|
||||||
if sf.trie == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return sf.db.CopyTrie(sf.trie)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// abort interrupts the subfetcher immediately. It is safe to call abort multiple
|
|
||||||
// times but it is not thread safe.
|
|
||||||
func (sf *subfetcher) abort() {
|
|
||||||
select {
|
|
||||||
case <-sf.stop:
|
|
||||||
default:
|
|
||||||
close(sf.stop)
|
|
||||||
}
|
|
||||||
<-sf.term
|
|
||||||
}
|
|
||||||
|
|
||||||
// loop waits for new tasks to be scheduled and keeps loading them until it runs
|
|
||||||
// out of tasks or its underlying trie is retrieved for committing.
|
|
||||||
func (sf *subfetcher) loop() {
|
|
||||||
// No matter how the loop stops, signal anyone waiting that it's terminated
|
|
||||||
defer close(sf.term)
|
|
||||||
|
|
||||||
// Start by opening the trie and stop processing if it fails
|
|
||||||
if sf.owner == (common.Hash{}) {
|
|
||||||
trie, err := sf.db.OpenTrie(sf.root)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sf.trie = trie
|
|
||||||
} else {
|
|
||||||
trie, err := sf.db.OpenStorageTrie(sf.owner, sf.root)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sf.trie = trie
|
|
||||||
}
|
|
||||||
// Trie opened successfully, keep prefetching items
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-sf.wake:
|
|
||||||
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
|
|
||||||
sf.lock.Lock()
|
|
||||||
tasks := sf.tasks
|
|
||||||
sf.tasks = nil
|
|
||||||
sf.lock.Unlock()
|
|
||||||
|
|
||||||
// Prefetch any tasks until the loop is interrupted
|
|
||||||
for i, task := range tasks {
|
|
||||||
select {
|
|
||||||
case <-sf.stop:
|
|
||||||
// If termination is requested, add any leftover back and return
|
|
||||||
sf.lock.Lock()
|
|
||||||
sf.tasks = append(sf.tasks, tasks[i:]...)
|
|
||||||
sf.lock.Unlock()
|
|
||||||
return
|
|
||||||
|
|
||||||
case ch := <-sf.copy:
|
|
||||||
// Somebody wants a copy of the current trie, grant them
|
|
||||||
ch <- sf.db.CopyTrie(sf.trie)
|
|
||||||
|
|
||||||
default:
|
|
||||||
// No termination request yet, prefetch the next entry
|
|
||||||
if _, ok := sf.seen[string(task)]; ok {
|
|
||||||
sf.dups++
|
|
||||||
} else {
|
|
||||||
if len(task) == len(common.Address{}) {
|
|
||||||
sf.trie.TryGetAccount(task)
|
|
||||||
} else {
|
|
||||||
sf.trie.TryGet(task)
|
|
||||||
}
|
|
||||||
sf.seen[string(task)] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case ch := <-sf.copy:
|
|
||||||
// Somebody wants a copy of the current trie, grant them
|
|
||||||
ch <- sf.db.CopyTrie(sf.trie)
|
|
||||||
|
|
||||||
case <-sf.stop:
|
|
||||||
// Termination is requested, abort and leave remaining tasks
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user