Merge pull request #1 from cerc-io/ian/v5

[WIP] ipld-eth-statedb
This commit is contained in:
Ian Norden 2023-02-27 08:15:53 -06:00 committed by GitHub
commit b7b1896699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1973 additions and 0 deletions

120
access_list.go Normal file
View File

@ -0,0 +1,120 @@
package ipld_eth_statedb
import (
"github.com/ethereum/go-ethereum/common"
)
type accessList struct {
addresses map[common.Address]int
slots []map[common.Hash]struct{}
}
// ContainsAddress returns true if the address is in the access list.
func (al *accessList) ContainsAddress(address common.Address) bool {
_, ok := al.addresses[address]
return ok
}
// Contains checks if a slot within an account is present in the access list, returning
// separate flags for the presence of the account and the slot respectively.
func (al *accessList) Contains(address common.Address, slot common.Hash) (addressPresent bool, slotPresent bool) {
idx, ok := al.addresses[address]
if !ok {
// no such address (and hence zero slots)
return false, false
}
if idx == -1 {
// address yes, but no slots
return true, false
}
_, slotPresent = al.slots[idx][slot]
return true, slotPresent
}
// newAccessList creates a new accessList.
func newAccessList() *accessList {
return &accessList{
addresses: make(map[common.Address]int),
}
}
// Copy creates an independent copy of an accessList.
func (a *accessList) Copy() *accessList {
cp := newAccessList()
for k, v := range a.addresses {
cp.addresses[k] = v
}
cp.slots = make([]map[common.Hash]struct{}, len(a.slots))
for i, slotMap := range a.slots {
newSlotmap := make(map[common.Hash]struct{}, len(slotMap))
for k := range slotMap {
newSlotmap[k] = struct{}{}
}
cp.slots[i] = newSlotmap
}
return cp
}
// AddAddress adds an address to the access list, and returns 'true' if the operation
// caused a change (addr was not previously in the list).
func (al *accessList) AddAddress(address common.Address) bool {
if _, present := al.addresses[address]; present {
return false
}
al.addresses[address] = -1
return true
}
// AddSlot adds the specified (addr, slot) combo to the access list.
// Return values are:
// - address added
// - slot added
// For any 'true' value returned, a corresponding journal entry must be made.
func (al *accessList) AddSlot(address common.Address, slot common.Hash) (addrChange bool, slotChange bool) {
idx, addrPresent := al.addresses[address]
if !addrPresent || idx == -1 {
// Address not present, or addr present but no slots there
al.addresses[address] = len(al.slots)
slotmap := map[common.Hash]struct{}{slot: {}}
al.slots = append(al.slots, slotmap)
return !addrPresent, true
}
// There is already an (address,slot) mapping
slotmap := al.slots[idx]
if _, ok := slotmap[slot]; !ok {
slotmap[slot] = struct{}{}
// Journal add slot change
return false, true
}
// No changes required
return false, false
}
// DeleteSlot removes an (address, slot)-tuple from the access list.
// This operation needs to be performed in the same order as the addition happened.
// This method is meant to be used by the journal, which maintains ordering of
// operations.
func (al *accessList) DeleteSlot(address common.Address, slot common.Hash) {
idx, addrOk := al.addresses[address]
// There are two ways this can fail
if !addrOk {
panic("reverting slot change, address not present in list")
}
slotmap := al.slots[idx]
delete(slotmap, slot)
// If that was the last (first) slot, remove it
// Since additions and rollbacks are always performed in order,
// we can delete the item without worrying about screwing up later indices
if len(slotmap) == 0 {
al.slots = al.slots[:idx]
al.addresses[address] = -1
}
}
// DeleteAddress removes an address from the access list. This operation
// needs to be performed in the same order as the addition happened.
// This method is meant to be used by the journal, which maintains ordering of
// operations.
func (al *accessList) DeleteAddress(address common.Address) {
delete(al.addresses, address)
}

64
config.go Normal file
View File

@ -0,0 +1,64 @@
package ipld_eth_statedb
import (
"context"
"time"
"github.com/jackc/pgx/pgxpool"
)
type Config struct {
Hostname string
Port int
DatabaseName string
Username string
Password string
ConnTimeout time.Duration
MaxConns int
MinConns int
MaxConnLifetime time.Duration
MaxConnIdleTime time.Duration
}
// NewPGXPool returns a new pgx conn pool
func (c Config) NewPGXPool(ctx context.Context, config Config) (*pgxpool.Pool, error) {
pgConf, err := makePGXConfig(config)
if err != nil {
return nil, err
}
return pgxpool.ConnectConfig(ctx, pgConf)
}
// makePGXConfig creates a pgxpool.Config from the provided Config
func makePGXConfig(config Config) (*pgxpool.Config, error) {
conf, err := pgxpool.ParseConfig("")
if err != nil {
return nil, err
}
//conf.ConnConfig.BuildStatementCache = nil
conf.ConnConfig.Config.Host = config.Hostname
conf.ConnConfig.Config.Port = uint16(config.Port)
conf.ConnConfig.Config.Database = config.DatabaseName
conf.ConnConfig.Config.User = config.Username
conf.ConnConfig.Config.Password = config.Password
if config.ConnTimeout != 0 {
conf.ConnConfig.Config.ConnectTimeout = config.ConnTimeout
}
if config.MaxConns != 0 {
conf.MaxConns = int32(config.MaxConns)
}
if config.MinConns != 0 {
conf.MinConns = int32(config.MinConns)
}
if config.MaxConnLifetime != 0 {
conf.MaxConnLifetime = config.MaxConnLifetime
}
if config.MaxConnIdleTime != 0 {
conf.MaxConnIdleTime = config.MaxConnIdleTime
}
return conf, nil
}

39
database.go Normal file
View File

@ -0,0 +1,39 @@
package ipld_eth_statedb
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/jackc/pgx/pgxpool"
)
type Database interface {
ContractCode(addrHash common.Hash, codeHash common.Hash) ([]byte, error)
ContractCodeSize(addrHash common.Hash, codeHash common.Hash) (int, error)
OpenTrie(root common.Hash) (state.Trie, error)
OpenStorageTrie(addrHash common.Hash, root common.Hash) (state.Trie, error)
CopyTrie(trie state.Trie) state.Trie
}
type StateDatabase struct {
pgxpool.Pool
}
func (sd *StateDatabase) ContractCode(addrHash common.Hash, codeHash common.Hash) ([]byte, error) {
panic("implement me")
}
func (sd *StateDatabase) ContractCodeSize(addrHash common.Hash, codeHash common.Hash) (int, error) {
panic("implement me")
}
func (sd *StateDatabase) OpenTrie(root common.Hash) (state.Trie, error) {
panic("replace my usage")
}
func (sd *StateDatabase) OpenStorageTrie(addrHash common.Hash, root common.Hash) (state.Trie, error) {
panic("replace my usage")
}
func (sd *StateDatabase) CopyTrie(trie state.Trie) state.Trie {
panic("replace my usage")
}

253
journal.go Normal file
View File

@ -0,0 +1,253 @@
package ipld_eth_statedb
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
)
// journalEntry is a modification entry in the state change journal that can be
// reverted on demand.
type journalEntry interface {
// revert undoes the changes introduced by this journal entry.
revert(*StateDB)
// dirtied returns the Ethereum address modified by this journal entry.
dirtied() *common.Address
}
// journal contains the list of state modifications applied since the last state
// commit. These are tracked to be able to be reverted in the case of an execution
// exception or request for reversal.
type journal struct {
entries []journalEntry // Current changes tracked by the journal
dirties map[common.Address]int // Dirty accounts and the number of changes
}
// newJournal creates a new initialized journal.
func newJournal() *journal {
return &journal{
dirties: make(map[common.Address]int),
}
}
// append inserts a new modification entry to the end of the change journal.
func (j *journal) append(entry journalEntry) {
j.entries = append(j.entries, entry)
if addr := entry.dirtied(); addr != nil {
j.dirties[*addr]++
}
}
// revert undoes a batch of journalled modifications along with any reverted
// dirty handling too.
func (j *journal) revert(statedb *StateDB, snapshot int) {
for i := len(j.entries) - 1; i >= snapshot; i-- {
// Undo the changes made by the operation
j.entries[i].revert(statedb)
// Drop any dirty tracking induced by the change
if addr := j.entries[i].dirtied(); addr != nil {
if j.dirties[*addr]--; j.dirties[*addr] == 0 {
delete(j.dirties, *addr)
}
}
}
j.entries = j.entries[:snapshot]
}
// dirty explicitly sets an address to dirty, even if the change entries would
// otherwise suggest it as clean. This method is an ugly hack to handle the RIPEMD
// precompile consensus exception.
func (j *journal) dirty(addr common.Address) {
j.dirties[addr]++
}
// length returns the current number of entries in the journal.
func (j *journal) length() int {
return len(j.entries)
}
type (
// Changes to the account trie.
createObjectChange struct {
account *common.Address
}
resetObjectChange struct {
prev *stateObject
prevdestruct bool
}
suicideChange struct {
account *common.Address
prev bool // whether account had already suicided
prevbalance *big.Int
}
// Changes to individual accounts.
balanceChange struct {
account *common.Address
prev *big.Int
}
nonceChange struct {
account *common.Address
prev uint64
}
storageChange struct {
account *common.Address
key, prevalue common.Hash
}
codeChange struct {
account *common.Address
prevcode, prevhash []byte
}
// Changes to other state values.
refundChange struct {
prev uint64
}
addLogChange struct {
txhash common.Hash
}
addPreimageChange struct {
hash common.Hash
}
touchChange struct {
account *common.Address
}
// Changes to the access list
accessListAddAccountChange struct {
address *common.Address
}
accessListAddSlotChange struct {
address *common.Address
slot *common.Hash
}
)
func (ch createObjectChange) revert(s *StateDB) {
delete(s.stateObjects, *ch.account)
delete(s.stateObjectsDirty, *ch.account)
}
func (ch createObjectChange) dirtied() *common.Address {
return ch.account
}
func (ch resetObjectChange) revert(s *StateDB) {
s.setStateObject(ch.prev)
if !ch.prevdestruct && s.snap != nil {
delete(s.snapDestructs, ch.prev.addrHash)
}
}
func (ch resetObjectChange) dirtied() *common.Address {
return nil
}
func (ch suicideChange) revert(s *StateDB) {
obj := s.getStateObject(*ch.account)
if obj != nil {
obj.suicided = ch.prev
obj.setBalance(ch.prevbalance)
}
}
func (ch suicideChange) dirtied() *common.Address {
return ch.account
}
var ripemd = common.HexToAddress("0000000000000000000000000000000000000003")
func (ch touchChange) revert(s *StateDB) {
}
func (ch touchChange) dirtied() *common.Address {
return ch.account
}
func (ch balanceChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setBalance(ch.prev)
}
func (ch balanceChange) dirtied() *common.Address {
return ch.account
}
func (ch nonceChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setNonce(ch.prev)
}
func (ch nonceChange) dirtied() *common.Address {
return ch.account
}
func (ch codeChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setCode(common.BytesToHash(ch.prevhash), ch.prevcode)
}
func (ch codeChange) dirtied() *common.Address {
return ch.account
}
func (ch storageChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setState(ch.key, ch.prevalue)
}
func (ch storageChange) dirtied() *common.Address {
return ch.account
}
func (ch refundChange) revert(s *StateDB) {
s.refund = ch.prev
}
func (ch refundChange) dirtied() *common.Address {
return nil
}
func (ch addLogChange) revert(s *StateDB) {
logs := s.logs[ch.txhash]
if len(logs) == 1 {
delete(s.logs, ch.txhash)
} else {
s.logs[ch.txhash] = logs[:len(logs)-1]
}
s.logSize--
}
func (ch addLogChange) dirtied() *common.Address {
return nil
}
func (ch addPreimageChange) revert(s *StateDB) {
delete(s.preimages, ch.hash)
}
func (ch addPreimageChange) dirtied() *common.Address {
return nil
}
func (ch accessListAddAccountChange) revert(s *StateDB) {
/*
One important invariant here, is that whenever a (addr, slot) is added, if the
addr is not already present, the add causes two journal entries:
- one for the address,
- one for the (address,slot)
Therefore, when unrolling the change, we can always blindly delete the
(addr) at this point, since no storage adds can remain when come upon
a single (addr) change.
*/
s.accessList.DeleteAddress(*ch.address)
}
func (ch accessListAddAccountChange) dirtied() *common.Address {
return nil
}
func (ch accessListAddSlotChange) revert(s *StateDB) {
s.accessList.DeleteSlot(*ch.address, *ch.slot)
}
func (ch accessListAddSlotChange) dirtied() *common.Address {
return nil
}

521
state_object.go Normal file
View File

@ -0,0 +1,521 @@
package ipld_eth_statedb
import (
"bytes"
"fmt"
"io"
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
var emptyCodeHash = crypto.Keccak256(nil)
type Code []byte
func (c Code) String() string {
return string(c) //strings.Join(Disassemble(c), " ")
}
type Storage map[common.Hash]common.Hash
func (s Storage) String() (str string) {
for key, value := range s {
str += fmt.Sprintf("%X : %X\n", key, value)
}
return
}
func (s Storage) Copy() Storage {
cpy := make(Storage, len(s))
for key, value := range s {
cpy[key] = value
}
return cpy
}
// stateObject represents an Ethereum account which is being modified.
//
// The usage pattern is as follows:
// First you need to obtain a state object.
// Account values can be accessed and modified through the object.
// Finally, call CommitTrie to write the modified storage trie into a database.
type stateObject struct {
address common.Address
addrHash common.Hash // hash of ethereum address of the account
data types.StateAccount
db *StateDB
// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
// during a database read is memoized here and will eventually be returned
// by StateDB.Commit.
dbErr error
// Write caches.
trie state.Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
// Cache flags.
// When an object is marked suicided it will be delete from the trie
// during the "update" phase of the state transition.
dirtyCode bool // true if the code was updated
suicided bool
deleted bool
}
// empty returns whether the account is considered empty.
func (s *stateObject) empty() bool {
return s.data.Nonce == 0 && s.data.Balance.Sign() == 0 && bytes.Equal(s.data.CodeHash, emptyCodeHash)
}
// newObject creates a state object.
func newObject(db *StateDB, address common.Address, data types.StateAccount) *stateObject {
if data.Balance == nil {
data.Balance = new(big.Int)
}
if data.CodeHash == nil {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
return &stateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
data: data,
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
}
}
// EncodeRLP implements rlp.Encoder.
func (s *stateObject) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, &s.data)
}
// setError remembers the first non-nil error it is called with.
func (s *stateObject) setError(err error) {
if s.dbErr == nil {
s.dbErr = err
}
}
func (s *stateObject) markSuicided() {
s.suicided = true
}
func (s *stateObject) touch() {
s.db.journal.append(touchChange{
account: &s.address,
})
if s.address == ripemd {
// Explicitly put it in the dirty-cache, which is otherwise generated from
// flattened journals.
s.db.journal.dirty(s.address)
}
}
func (s *stateObject) getTrie(db Database) state.Trie {
if s.trie == nil {
// Try fetching from prefetcher first
// We don't prefetch empty tries
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no
// prefetcher
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
}
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
}
}
}
return s.trie
}
// GetState retrieves a value from the account storage trie.
func (s *stateObject) GetState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
if s.fakeStorage != nil {
return s.fakeStorage[key]
}
// If we have a dirty value for this state entry, return it
value, dirty := s.dirtyStorage[key]
if dirty {
return value
}
// Otherwise return the entry's original value
return s.GetCommittedState(db, key)
}
// GetCommittedState retrieves a value from the committed account storage trie.
func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Hash {
// If the fake storage is set, only lookup the state here(in the debugging mode)
if s.fakeStorage != nil {
return s.fakeStorage[key]
}
// If we have a pending write or clean cached, return that
if value, pending := s.pendingStorage[key]; pending {
return value
}
if value, cached := s.originStorage[key]; cached {
return value
}
// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
)
if s.db.snap != nil {
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
// snapshot about any storage values. The only possible alternatives are:
// 1) resurrect happened, and new slot values were set -- those should
// have been handles via pendingStorage above.
// 2) we don't have new values, and can deliver empty response back
if _, destructed := s.db.snapDestructs[s.addrHash]; destructed {
return common.Hash{}
}
start := time.Now()
enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes()))
if metrics.EnabledExpensive {
s.db.SnapshotStorageReads += time.Since(start)
}
}
// If the snapshot is unavailable or reading from it fails, load from the database.
if s.db.snap == nil || err != nil {
start := time.Now()
enc, err = s.getTrie(db).TryGet(key.Bytes())
if metrics.EnabledExpensive {
s.db.StorageReads += time.Since(start)
}
if err != nil {
s.setError(err)
return common.Hash{}
}
}
var value common.Hash
if len(enc) > 0 {
_, content, _, err := rlp.Split(enc)
if err != nil {
s.setError(err)
}
value.SetBytes(content)
}
s.originStorage[key] = value
return value
}
// SetState updates a value in account storage.
func (s *stateObject) SetState(db Database, key, value common.Hash) {
// If the fake storage is set, put the temporary state update here.
if s.fakeStorage != nil {
s.fakeStorage[key] = value
return
}
// If the new value is the same as old, don't set
prev := s.GetState(db, key)
if prev == value {
return
}
// New value is different, update and journal the change
s.db.journal.append(storageChange{
account: &s.address,
key: key,
prevalue: prev,
})
s.setState(key, value)
}
// SetStorage replaces the entire state storage with the given one.
//
// After this function is called, all original state will be ignored and state
// lookup only happens in the fake state storage.
//
// Note this function should only be used for debugging purpose.
func (s *stateObject) SetStorage(storage map[common.Hash]common.Hash) {
// Allocate fake storage if it's nil.
if s.fakeStorage == nil {
s.fakeStorage = make(Storage)
}
for key, value := range storage {
s.fakeStorage[key] = value
}
// Don't bother journal since this function should only be used for
// debugging and the `fake` storage won't be committed to database.
}
func (s *stateObject) setState(key, value common.Hash) {
s.dirtyStorage[key] = value
}
// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise(prefetch bool) {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, slotsToPrefetch)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
}
}
// updateTrie writes cached storage modifications into the object's storage trie.
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) state.Trie {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false) // Don't prefetch anymore, pull directly if need be
if len(s.pendingStorage) == 0 {
return s.trie
}
// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// The snapshot storage map for the object
var storage map[common.Hash][]byte
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher
usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
continue
}
s.originStorage[key] = value
var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if it's deleted
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.addrHash, s.data.Root, usedStorage)
}
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
return tr
}
// UpdateRoot sets the trie root to the current root hash of
func (s *stateObject) updateRoot(db Database) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
return
}
// Track the amount of time wasted on hashing the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageHashes += time.Since(start) }(time.Now())
}
s.data.Root = s.trie.Hash()
}
// CommitTrie the storage trie of the object to db.
// This updates the trie root.
func (s *stateObject) CommitTrie(db Database) (*trie.NodeSet, error) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
return nil, nil
}
if s.dbErr != nil {
return nil, s.dbErr
}
// Track the amount of time wasted on committing the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageCommits += time.Since(start) }(time.Now())
}
root, nodes, err := s.trie.Commit(false)
if err == nil {
s.data.Root = root
}
return nodes, err
}
// AddBalance adds amount to s's balance.
// It is used to add funds to the destination account of a transfer.
func (s *stateObject) AddBalance(amount *big.Int) {
// EIP161: We must check emptiness for the objects such that the account
// clearing (0,0,0 objects) can take effect.
if amount.Sign() == 0 {
if s.empty() {
s.touch()
}
return
}
s.SetBalance(new(big.Int).Add(s.Balance(), amount))
}
// SubBalance removes amount from s's balance.
// It is used to remove funds from the origin account of a transfer.
func (s *stateObject) SubBalance(amount *big.Int) {
if amount.Sign() == 0 {
return
}
s.SetBalance(new(big.Int).Sub(s.Balance(), amount))
}
func (s *stateObject) SetBalance(amount *big.Int) {
s.db.journal.append(balanceChange{
account: &s.address,
prev: new(big.Int).Set(s.data.Balance),
})
s.setBalance(amount)
}
func (s *stateObject) setBalance(amount *big.Int) {
s.data.Balance = amount
}
func (s *stateObject) deepCopy(db *StateDB) *stateObject {
stateObject := newObject(db, s.address, s.data)
if s.trie != nil {
stateObject.trie = db.db.CopyTrie(s.trie)
}
stateObject.code = s.code
stateObject.dirtyStorage = s.dirtyStorage.Copy()
stateObject.originStorage = s.originStorage.Copy()
stateObject.pendingStorage = s.pendingStorage.Copy()
stateObject.suicided = s.suicided
stateObject.dirtyCode = s.dirtyCode
stateObject.deleted = s.deleted
return stateObject
}
//
// Attribute accessors
//
// Returns the address of the contract/account
func (s *stateObject) Address() common.Address {
return s.address
}
// Code returns the contract code associated with this object, if any.
func (s *stateObject) Code(db Database) []byte {
if s.code != nil {
return s.code
}
if bytes.Equal(s.CodeHash(), emptyCodeHash) {
return nil
}
code, err := db.ContractCode(s.addrHash, common.BytesToHash(s.CodeHash()))
if err != nil {
s.setError(fmt.Errorf("can't load code hash %x: %v", s.CodeHash(), err))
}
s.code = code
return code
}
// CodeSize returns the size of the contract code associated with this object,
// or zero if none. This method is an almost mirror of Code, but uses a cache
// inside the database to avoid loading codes seen recently.
func (s *stateObject) CodeSize(db Database) int {
if s.code != nil {
return len(s.code)
}
if bytes.Equal(s.CodeHash(), emptyCodeHash) {
return 0
}
size, err := db.ContractCodeSize(s.addrHash, common.BytesToHash(s.CodeHash()))
if err != nil {
s.setError(fmt.Errorf("can't load code size %x: %v", s.CodeHash(), err))
}
return size
}
func (s *stateObject) SetCode(codeHash common.Hash, code []byte) {
prevcode := s.Code(s.db.db)
s.db.journal.append(codeChange{
account: &s.address,
prevhash: s.CodeHash(),
prevcode: prevcode,
})
s.setCode(codeHash, code)
}
func (s *stateObject) setCode(codeHash common.Hash, code []byte) {
s.code = code
s.data.CodeHash = codeHash[:]
s.dirtyCode = true
}
func (s *stateObject) SetNonce(nonce uint64) {
s.db.journal.append(nonceChange{
account: &s.address,
prev: s.data.Nonce,
})
s.setNonce(nonce)
}
func (s *stateObject) setNonce(nonce uint64) {
s.data.Nonce = nonce
}
func (s *stateObject) CodeHash() []byte {
return s.data.CodeHash
}
func (s *stateObject) Balance() *big.Int {
return s.data.Balance
}
func (s *stateObject) Nonce() uint64 {
return s.data.Nonce
}
// Never called, but must be present to allow stateObject to be used
// as a vm.Account interface that also satisfies the vm.ContractRef
// interface. Interfaces are awesome.
func (s *stateObject) Value() *big.Int {
panic("Value on stateObject should never be called")
}

637
statedb.go Normal file
View File

@ -0,0 +1,637 @@
package ipld_eth_statedb
import (
"fmt"
"math/big"
"sort"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
/*
The portions of the EVM we want to leverage only use the following methods:
GetBalance
Snapshot
Exist
CreateAccount
SubBalance
AddBalance
GetCode
GetCodeHash
RevertToSnapshot
GetNonce
SetNonce
AddAddressToAccessList
SetCode
The rest can be left with panics for now
*/
var _ vm.StateDB = &StateDB{}
type revision struct {
id int
journalIndex int
}
var (
// emptyRoot is the known root hash of an empty trie.
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
)
// StateDB structs within the ethereum protocol are used to store anything
// within the merkle trie. StateDBs take care of caching and storing
// nested states. It's the general query interface to retrieve:
// * Contracts
// * Accounts
type StateDB struct {
db Database
prefetcher *triePrefetcher
trie state.Trie
hasher crypto.KeccakState
// originalRoot is the pre-state root, before any changes were made.
// It will be updated when the Commit is called.
originalRoot common.Hash
snaps *snapshot.Tree
snap snapshot.Snapshot
snapDestructs map[common.Hash]struct{}
snapAccounts map[common.Hash][]byte
snapStorage map[common.Hash]map[common.Hash][]byte
// This map holds 'live' objects, which will get modified while processing a state transition.
stateObjects map[common.Address]*stateObject
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution
// DB error.
// State objects are used by the consensus core and VM which are
// unable to deal with database-level errors. Any error that occurs
// during a database read is memoized here and will eventually be returned
// by StateDB.Commit.
dbErr error
// The refund counter, also used by state transitioning.
refund uint64
thash common.Hash
txIndex int
logs map[common.Hash][]*types.Log
logSize uint
preimages map[common.Hash][]byte
// Per-transaction access list
accessList *accessList
// Journal of state modifications. This is the backbone of
// Snapshot and RevertToSnapshot.
journal *journal
validRevisions []revision
nextRevisionId int
// Measurements gathered during execution for debugging purposes
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageHashes time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration
SnapshotAccountReads time.Duration
SnapshotStorageReads time.Duration
SnapshotCommits time.Duration
AccountUpdated int
StorageUpdated int
AccountDeleted int
StorageDeleted int
}
// New creates a new state from a given trie.
func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) {
tr, err := db.OpenTrie(root)
if err != nil {
return nil, err
}
sdb := &StateDB{
db: db,
trie: tr,
originalRoot: root,
snaps: snaps,
stateObjects: make(map[common.Address]*stateObject),
stateObjectsPending: make(map[common.Address]struct{}),
stateObjectsDirty: make(map[common.Address]struct{}),
logs: make(map[common.Hash][]*types.Log),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
accessList: newAccessList(),
hasher: crypto.NewKeccakState(),
}
if sdb.snaps != nil {
if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil {
sdb.snapDestructs = make(map[common.Hash]struct{})
sdb.snapAccounts = make(map[common.Hash][]byte)
sdb.snapStorage = make(map[common.Hash]map[common.Hash][]byte)
}
}
return sdb, nil
}
// setError remembers the first non-nil error it is called with.
func (s *StateDB) setError(err error) {
if s.dbErr == nil {
s.dbErr = err
}
}
func (s *StateDB) AddLog(log *types.Log) {
s.journal.append(addLogChange{txhash: s.thash})
log.TxHash = s.thash
log.TxIndex = uint(s.txIndex)
log.Index = s.logSize
s.logs[s.thash] = append(s.logs[s.thash], log)
s.logSize++
}
// AddPreimage records a SHA3 preimage seen by the VM.
func (s *StateDB) AddPreimage(hash common.Hash, preimage []byte) {
if _, ok := s.preimages[hash]; !ok {
s.journal.append(addPreimageChange{hash: hash})
pi := make([]byte, len(preimage))
copy(pi, preimage)
s.preimages[hash] = pi
}
}
// AddRefund adds gas to the refund counter
func (s *StateDB) AddRefund(gas uint64) {
s.journal.append(refundChange{prev: s.refund})
s.refund += gas
}
// SubRefund removes gas from the refund counter.
// This method will panic if the refund counter goes below zero
func (s *StateDB) SubRefund(gas uint64) {
s.journal.append(refundChange{prev: s.refund})
if gas > s.refund {
panic(fmt.Sprintf("Refund counter below zero (gas: %d > refund: %d)", gas, s.refund))
}
s.refund -= gas
}
// Exist reports whether the given account address exists in the state.
// Notably this also returns true for suicided accounts.
func (s *StateDB) Exist(addr common.Address) bool {
return s.getStateObject(addr) != nil
}
// Empty returns whether the state object is either non-existent
// or empty according to the EIP161 specification (balance = nonce = code = 0)
func (s *StateDB) Empty(addr common.Address) bool {
so := s.getStateObject(addr)
return so == nil || so.empty()
}
// GetBalance retrieves the balance from the given address or 0 if object not found
func (s *StateDB) GetBalance(addr common.Address) *big.Int {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.Balance()
}
return common.Big0
}
func (s *StateDB) GetNonce(addr common.Address) uint64 {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.Nonce()
}
return 0
}
func (s *StateDB) GetCode(addr common.Address) []byte {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.Code(s.db)
}
return nil
}
func (s *StateDB) GetCodeSize(addr common.Address) int {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.CodeSize(s.db)
}
return 0
}
func (s *StateDB) GetCodeHash(addr common.Address) common.Hash {
stateObject := s.getStateObject(addr)
if stateObject == nil {
return common.Hash{}
}
return common.BytesToHash(stateObject.CodeHash())
}
// GetState retrieves a value from the given account's storage trie.
func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.GetState(s.db, hash)
}
return common.Hash{}
}
// GetCommittedState retrieves a value from the given account's committed storage trie.
func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.GetCommittedState(s.db, hash)
}
return common.Hash{}
}
func (s *StateDB) HasSuicided(addr common.Address) bool {
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.suicided
}
return false
}
/*
* SETTERS
*/
// AddBalance adds amount to the account associated with addr.
func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.AddBalance(amount)
}
}
// SubBalance subtracts amount from the account associated with addr.
func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.SubBalance(amount)
}
}
func (s *StateDB) SetBalance(addr common.Address, amount *big.Int) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.SetBalance(amount)
}
}
func (s *StateDB) SetNonce(addr common.Address, nonce uint64) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.SetNonce(nonce)
}
}
func (s *StateDB) SetCode(addr common.Address, code []byte) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.SetCode(crypto.Keccak256Hash(code), code)
}
}
func (s *StateDB) SetState(addr common.Address, key, value common.Hash) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
stateObject.SetState(s.db, key, value)
}
}
// Suicide marks the given account as suicided.
// This clears the account balance.
//
// The account's state object is still available until the state is committed,
// getStateObject will return a non-nil account after Suicide.
func (s *StateDB) Suicide(addr common.Address) bool {
stateObject := s.getStateObject(addr)
if stateObject == nil {
return false
}
s.journal.append(suicideChange{
account: &addr,
prev: stateObject.suicided,
prevbalance: new(big.Int).Set(stateObject.Balance()),
})
stateObject.markSuicided()
stateObject.data.Balance = new(big.Int)
return true
}
//
// Setting, updating & deleting state object methods.
//
// updateStateObject writes the given object to the trie.
// TODO:
func (s *StateDB) updateStateObject(obj *stateObject) {
// Track the amount of time wasted on updating the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
}
// Encode the account and update the account trie
addr := obj.Address()
if err := s.trie.TryUpdateAccount(addr[:], &obj.data); err != nil {
s.setError(fmt.Errorf("updateStateObject (%x) error: %v", addr[:], err))
}
// If state snapshotting is active, cache the data til commit. Note, this
// update mechanism is not symmetric to the deletion, because whereas it is
// enough to track account updates at commit time, deletions need tracking
// at transaction boundary level to ensure we capture state clearing.
if s.snap != nil {
s.snapAccounts[obj.addrHash] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
}
}
// deleteStateObject removes the given object from the state trie.
// TODO:
func (s *StateDB) deleteStateObject(obj *stateObject) {
// Track the amount of time wasted on deleting the account from the trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())
}
// Delete the account from the trie
addr := obj.Address()
if err := s.trie.TryDeleteAccount(addr[:]); err != nil {
s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err))
}
}
// getStateObject retrieves a state object given by the address, returning nil if
// the object is not found or was deleted in this execution context. If you need
// to differentiate between non-existent/just-deleted, use getDeletedStateObject.
func (s *StateDB) getStateObject(addr common.Address) *stateObject {
if obj := s.getDeletedStateObject(addr); obj != nil && !obj.deleted {
return obj
}
return nil
}
// getDeletedStateObject is similar to getStateObject, but instead of returning
// nil for a deleted state object, it returns the actual object with the deleted
// flag set. This is needed by the state journal to revert to the correct s-
// destructed object instead of wiping all knowledge about the state object.
// TODO:
func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
// Prefer live objects if any is available
if obj := s.stateObjects[addr]; obj != nil {
return obj
}
// If no live objects are available, attempt to use snapshots
var data *types.StateAccount
if s.snap != nil {
start := time.Now()
acc, err := s.snap.Account(crypto.HashData(s.hasher, addr.Bytes()))
if metrics.EnabledExpensive {
s.SnapshotAccountReads += time.Since(start)
}
if err == nil {
if acc == nil {
return nil
}
data = &types.StateAccount{
Nonce: acc.Nonce,
Balance: acc.Balance,
CodeHash: acc.CodeHash,
Root: common.BytesToHash(acc.Root),
}
if len(data.CodeHash) == 0 {
data.CodeHash = emptyCodeHash
}
if data.Root == (common.Hash{}) {
data.Root = emptyRoot
}
}
}
// If snapshot unavailable or reading from it failed, load from the database
if data == nil {
start := time.Now()
var err error
data, err = s.trie.TryGetAccount(addr.Bytes())
if metrics.EnabledExpensive {
s.AccountReads += time.Since(start)
}
if err != nil {
s.setError(fmt.Errorf("getDeleteStateObject (%x) error: %w", addr.Bytes(), err))
return nil
}
if data == nil {
return nil
}
}
// Insert into the live set
obj := newObject(s, addr, *data)
s.setStateObject(obj)
return obj
}
func (s *StateDB) setStateObject(object *stateObject) {
s.stateObjects[object.Address()] = object
}
// getOrNewStateObject retrieves a state object or create a new state object if nil.
func (s *StateDB) getOrNewStateObject(addr common.Address) *stateObject {
stateObject := s.getStateObject(addr)
if stateObject == nil {
stateObject, _ = s.createObject(addr)
}
return stateObject
}
// createObject creates a new state object. If there is an existing account with
// the given address, it is overwritten and returned as the second return value.
func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject) {
prev = s.getDeletedStateObject(addr) // Note, prev might have been deleted, we need that!
var prevdestruct bool
if s.snap != nil && prev != nil {
_, prevdestruct = s.snapDestructs[prev.addrHash]
if !prevdestruct {
s.snapDestructs[prev.addrHash] = struct{}{}
}
}
newobj = newObject(s, addr, types.StateAccount{})
if prev == nil {
s.journal.append(createObjectChange{account: &addr})
} else {
s.journal.append(resetObjectChange{prev: prev, prevdestruct: prevdestruct})
}
s.setStateObject(newobj)
if prev != nil && !prev.deleted {
return newobj, prev
}
return newobj, nil
}
// CreateAccount explicitly creates a state object. If a state object with the address
// already exists the balance is carried over to the new account.
//
// CreateAccount is called during the EVM CREATE operation. The situation might arise that
// a contract does the following:
//
// 1. sends funds to sha(account ++ (nonce + 1))
// 2. tx_create(sha(account ++ nonce)) (note that this gets the address of 1)
//
// Carrying over the balance ensures that Ether doesn't disappear.
func (s *StateDB) CreateAccount(addr common.Address) {
newObj, prev := s.createObject(addr)
if prev != nil {
newObj.setBalance(prev.data.Balance)
}
}
func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common.Hash) bool) error {
so := db.getStateObject(addr)
if so == nil {
return nil
}
it := trie.NewIterator(so.getTrie(db.db).NodeIterator(nil))
for it.Next() {
key := common.BytesToHash(db.trie.GetKey(it.Key))
if value, dirty := so.dirtyStorage[key]; dirty {
if !cb(key, value) {
return nil
}
continue
}
if len(it.Value) > 0 {
_, content, _, err := rlp.Split(it.Value)
if err != nil {
return err
}
if !cb(key, common.BytesToHash(content)) {
return nil
}
}
}
return nil
}
// Snapshot returns an identifier for the current revision of the state.
func (s *StateDB) Snapshot() int {
id := s.nextRevisionId
s.nextRevisionId++
s.validRevisions = append(s.validRevisions, revision{id, s.journal.length()})
return id
}
// RevertToSnapshot reverts all state changes made since the given revision.
func (s *StateDB) RevertToSnapshot(revid int) {
// Find the snapshot in the stack of valid snapshots.
idx := sort.Search(len(s.validRevisions), func(i int) bool {
return s.validRevisions[i].id >= revid
})
if idx == len(s.validRevisions) || s.validRevisions[idx].id != revid {
panic(fmt.Errorf("revision id %v cannot be reverted", revid))
}
snapshot := s.validRevisions[idx].journalIndex
// Replay the journal to undo changes and remove invalidated snapshots
s.journal.revert(s, snapshot)
s.validRevisions = s.validRevisions[:idx]
}
// GetRefund returns the current value of the refund counter.
func (s *StateDB) GetRefund() uint64 {
return s.refund
}
func (s *StateDB) clearJournalAndRefund() {
if len(s.journal.entries) > 0 {
s.journal = newJournal()
s.refund = 0
}
s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entries
}
// PrepareAccessList handles the preparatory steps for executing a state transition with
// regards to both EIP-2929 and EIP-2930:
//
// - Add sender to access list (2929)
// - Add destination to access list (2929)
// - Add precompiles to access list (2929)
// - Add the contents of the optional tx access list (2930)
//
// This method should only be called if Berlin/2929+2930 is applicable at the current number.
func (s *StateDB) PrepareAccessList(sender common.Address, dst *common.Address, precompiles []common.Address, list types.AccessList) {
// Clear out any leftover from previous executions
s.accessList = newAccessList()
s.AddAddressToAccessList(sender)
if dst != nil {
s.AddAddressToAccessList(*dst)
// If it's a create-tx, the destination will be added inside evm.create
}
for _, addr := range precompiles {
s.AddAddressToAccessList(addr)
}
for _, el := range list {
s.AddAddressToAccessList(el.Address)
for _, key := range el.StorageKeys {
s.AddSlotToAccessList(el.Address, key)
}
}
}
// AddAddressToAccessList adds the given address to the access list
func (s *StateDB) AddAddressToAccessList(addr common.Address) {
if s.accessList.AddAddress(addr) {
s.journal.append(accessListAddAccountChange{&addr})
}
}
// AddSlotToAccessList adds the given (address, slot)-tuple to the access list
func (s *StateDB) AddSlotToAccessList(addr common.Address, slot common.Hash) {
addrMod, slotMod := s.accessList.AddSlot(addr, slot)
if addrMod {
// In practice, this should not happen, since there is no way to enter the
// scope of 'address' without having the 'address' become already added
// to the access list (via call-variant, create, etc).
// Better safe than sorry, though
s.journal.append(accessListAddAccountChange{&addr})
}
if slotMod {
s.journal.append(accessListAddSlotChange{
address: &addr,
slot: &slot,
})
}
}
// AddressInAccessList returns true if the given address is in the access list.
func (s *StateDB) AddressInAccessList(addr common.Address) bool {
return s.accessList.ContainsAddress(addr)
}
// SlotInAccessList returns true if the given (address, slot)-tuple is in the access list.
func (s *StateDB) SlotInAccessList(addr common.Address, slot common.Hash) (addressPresent bool, slotPresent bool) {
return s.accessList.Contains(addr, slot)
}

339
trie_prefetcher.go Normal file
View File

@ -0,0 +1,339 @@
package ipld_eth_statedb
import (
"sync"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
)
// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
//
// Note, the prefetcher's API is not thread safe.
type triePrefetcher struct {
db state.Database // Database to fetch trie nodes through
root common.Hash // Root hash of the account trie for metrics
fetches map[string]state.Trie // Partially or fully fetcher tries
fetchers map[string]*subfetcher // Subfetchers for each trie
deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountSkipMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter
}
func newTriePrefetcher(db state.Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
return p
}
// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
fetcher.abort() // safe to do multiple times
if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
// already loaded will be copied over, but no goroutines will be started. This
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[string]state.Trie), // Active prefetchers use the fetches map
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
for root, fetch := range p.fetches {
copy.fetches[root] = p.db.CopyTrie(fetch)
}
return copy
}
// Otherwise we're copying an active fetcher, retrieve the current states
for id, fetcher := range p.fetchers {
copy.fetches[id] = fetcher.peek()
}
return copy
}
// prefetch schedules a batch of trie items to prefetch.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]byte) {
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
// Active fetcher, schedule the retrievals
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, owner, root)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
}
// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
// have it.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) state.Trie {
// If the prefetcher is inactive, return from existing deep copies
id := p.trieID(owner, root)
if p.fetches != nil {
trie := p.fetches[id]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
fetcher.abort() // safe to do multiple times
trie := fetcher.peek()
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return trie
}
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.used = used
}
}
// trieID returns an unique trie identifier consists the trie owner and root hash.
func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string {
return string(append(owner.Bytes(), root.Bytes()...))
}
// subfetcher is a trie fetcher goroutine responsible for pulling entries for a
// single trie. It is spawned when a new root is encountered and lives until the
// main prefetcher is paused and either all requested items are processed or if
// the trie being worked on is retrieved from the prefetcher.
type subfetcher struct {
db state.Database // Database to load trie nodes through
owner common.Hash // Owner of the trie, usually account hash
root common.Hash // Root hash of the trie to prefetch
trie state.Trie // Trie being populated with nodes
tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
copy chan chan state.Trie // Channel to request a copy of the current trie
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
}
// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db state.Database, owner common.Hash, root common.Hash) *subfetcher {
sf := &subfetcher{
db: db,
owner: owner,
root: root,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
copy: make(chan chan state.Trie),
seen: make(map[string]struct{}),
}
go sf.loop()
return sf
}
// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) {
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
sf.lock.Unlock()
// Notify the prefetcher, it's fine if it's already terminated
select {
case sf.wake <- struct{}{}:
default:
}
}
// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it
// is currently.
func (sf *subfetcher) peek() state.Trie {
ch := make(chan state.Trie)
select {
case sf.copy <- ch:
// Subfetcher still alive, return copy from it
return <-ch
case <-sf.term:
// Subfetcher already terminated, return a copy directly
if sf.trie == nil {
return nil
}
return sf.db.CopyTrie(sf.trie)
}
}
// abort interrupts the subfetcher immediately. It is safe to call abort multiple
// times but it is not thread safe.
func (sf *subfetcher) abort() {
select {
case <-sf.stop:
default:
close(sf.stop)
}
<-sf.term
}
// loop waits for new tasks to be scheduled and keeps loading them until it runs
// out of tasks or its underlying trie is retrieved for committing.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
// Start by opening the trie and stop processing if it fails
if sf.owner == (common.Hash{}) {
trie, err := sf.db.OpenTrie(sf.root)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
return
}
sf.trie = trie
} else {
trie, err := sf.db.OpenStorageTrie(sf.owner, sf.root)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err)
return
}
sf.trie = trie
}
// Trie opened successfully, keep prefetching items
for {
select {
case <-sf.wake:
// Subfetcher was woken up, retrieve any tasks to avoid spinning the lock
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()
// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
select {
case <-sf.stop:
// If termination is requested, add any leftover back and return
sf.lock.Lock()
sf.tasks = append(sf.tasks, tasks[i:]...)
sf.lock.Unlock()
return
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
default:
// No termination request yet, prefetch the next entry
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
} else {
if len(task) == len(common.Address{}) {
sf.trie.TryGetAccount(task)
} else {
sf.trie.TryGet(task)
}
sf.seen[string(task)] = struct{}{}
}
}
}
case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
case <-sf.stop:
// Termination is requested, abort and leave remaining tasks
return
}
}
}