diff --git a/access_list.go b/access_list.go new file mode 100644 index 0000000..7aa2579 --- /dev/null +++ b/access_list.go @@ -0,0 +1,120 @@ +package ipld_eth_statedb + +import ( + "github.com/ethereum/go-ethereum/common" +) + +type accessList struct { + addresses map[common.Address]int + slots []map[common.Hash]struct{} +} + +// ContainsAddress returns true if the address is in the access list. +func (al *accessList) ContainsAddress(address common.Address) bool { + _, ok := al.addresses[address] + return ok +} + +// Contains checks if a slot within an account is present in the access list, returning +// separate flags for the presence of the account and the slot respectively. +func (al *accessList) Contains(address common.Address, slot common.Hash) (addressPresent bool, slotPresent bool) { + idx, ok := al.addresses[address] + if !ok { + // no such address (and hence zero slots) + return false, false + } + if idx == -1 { + // address yes, but no slots + return true, false + } + _, slotPresent = al.slots[idx][slot] + return true, slotPresent +} + +// newAccessList creates a new accessList. +func newAccessList() *accessList { + return &accessList{ + addresses: make(map[common.Address]int), + } +} + +// Copy creates an independent copy of an accessList. +func (a *accessList) Copy() *accessList { + cp := newAccessList() + for k, v := range a.addresses { + cp.addresses[k] = v + } + cp.slots = make([]map[common.Hash]struct{}, len(a.slots)) + for i, slotMap := range a.slots { + newSlotmap := make(map[common.Hash]struct{}, len(slotMap)) + for k := range slotMap { + newSlotmap[k] = struct{}{} + } + cp.slots[i] = newSlotmap + } + return cp +} + +// AddAddress adds an address to the access list, and returns 'true' if the operation +// caused a change (addr was not previously in the list). +func (al *accessList) AddAddress(address common.Address) bool { + if _, present := al.addresses[address]; present { + return false + } + al.addresses[address] = -1 + return true +} + +// AddSlot adds the specified (addr, slot) combo to the access list. +// Return values are: +// - address added +// - slot added +// For any 'true' value returned, a corresponding journal entry must be made. +func (al *accessList) AddSlot(address common.Address, slot common.Hash) (addrChange bool, slotChange bool) { + idx, addrPresent := al.addresses[address] + if !addrPresent || idx == -1 { + // Address not present, or addr present but no slots there + al.addresses[address] = len(al.slots) + slotmap := map[common.Hash]struct{}{slot: {}} + al.slots = append(al.slots, slotmap) + return !addrPresent, true + } + // There is already an (address,slot) mapping + slotmap := al.slots[idx] + if _, ok := slotmap[slot]; !ok { + slotmap[slot] = struct{}{} + // Journal add slot change + return false, true + } + // No changes required + return false, false +} + +// DeleteSlot removes an (address, slot)-tuple from the access list. +// This operation needs to be performed in the same order as the addition happened. +// This method is meant to be used by the journal, which maintains ordering of +// operations. +func (al *accessList) DeleteSlot(address common.Address, slot common.Hash) { + idx, addrOk := al.addresses[address] + // There are two ways this can fail + if !addrOk { + panic("reverting slot change, address not present in list") + } + slotmap := al.slots[idx] + delete(slotmap, slot) + // If that was the last (first) slot, remove it + // Since additions and rollbacks are always performed in order, + // we can delete the item without worrying about screwing up later indices + if len(slotmap) == 0 { + al.slots = al.slots[:idx] + al.addresses[address] = -1 + } +} + +// DeleteAddress removes an address from the access list. This operation +// needs to be performed in the same order as the addition happened. +// This method is meant to be used by the journal, which maintains ordering of +// operations. +func (al *accessList) DeleteAddress(address common.Address) { + delete(al.addresses, address) +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..77f9730 --- /dev/null +++ b/config.go @@ -0,0 +1,64 @@ +package ipld_eth_statedb + +import ( + "context" + "time" + + "github.com/jackc/pgx/pgxpool" +) + +type Config struct { + Hostname string + Port int + DatabaseName string + Username string + Password string + + ConnTimeout time.Duration + MaxConns int + MinConns int + MaxConnLifetime time.Duration + MaxConnIdleTime time.Duration +} + +// NewPGXPool returns a new pgx conn pool +func (c Config) NewPGXPool(ctx context.Context, config Config) (*pgxpool.Pool, error) { + pgConf, err := makePGXConfig(config) + if err != nil { + return nil, err + } + return pgxpool.ConnectConfig(ctx, pgConf) +} + +// makePGXConfig creates a pgxpool.Config from the provided Config +func makePGXConfig(config Config) (*pgxpool.Config, error) { + conf, err := pgxpool.ParseConfig("") + if err != nil { + return nil, err + } + + //conf.ConnConfig.BuildStatementCache = nil + conf.ConnConfig.Config.Host = config.Hostname + conf.ConnConfig.Config.Port = uint16(config.Port) + conf.ConnConfig.Config.Database = config.DatabaseName + conf.ConnConfig.Config.User = config.Username + conf.ConnConfig.Config.Password = config.Password + + if config.ConnTimeout != 0 { + conf.ConnConfig.Config.ConnectTimeout = config.ConnTimeout + } + if config.MaxConns != 0 { + conf.MaxConns = int32(config.MaxConns) + } + if config.MinConns != 0 { + conf.MinConns = int32(config.MinConns) + } + if config.MaxConnLifetime != 0 { + conf.MaxConnLifetime = config.MaxConnLifetime + } + if config.MaxConnIdleTime != 0 { + conf.MaxConnIdleTime = config.MaxConnIdleTime + } + + return conf, nil +} diff --git a/database.go b/database.go new file mode 100644 index 0000000..b3d4fa8 --- /dev/null +++ b/database.go @@ -0,0 +1,39 @@ +package ipld_eth_statedb + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/jackc/pgx/pgxpool" +) + +type Database interface { + ContractCode(addrHash common.Hash, codeHash common.Hash) ([]byte, error) + ContractCodeSize(addrHash common.Hash, codeHash common.Hash) (int, error) + OpenTrie(root common.Hash) (state.Trie, error) + OpenStorageTrie(addrHash common.Hash, root common.Hash) (state.Trie, error) + CopyTrie(trie state.Trie) state.Trie +} + +type StateDatabase struct { + pgxpool.Pool +} + +func (sd *StateDatabase) ContractCode(addrHash common.Hash, codeHash common.Hash) ([]byte, error) { + panic("implement me") +} + +func (sd *StateDatabase) ContractCodeSize(addrHash common.Hash, codeHash common.Hash) (int, error) { + panic("implement me") +} + +func (sd *StateDatabase) OpenTrie(root common.Hash) (state.Trie, error) { + panic("replace my usage") +} + +func (sd *StateDatabase) OpenStorageTrie(addrHash common.Hash, root common.Hash) (state.Trie, error) { + panic("replace my usage") +} + +func (sd *StateDatabase) CopyTrie(trie state.Trie) state.Trie { + panic("replace my usage") +} diff --git a/journal.go b/journal.go new file mode 100644 index 0000000..1206865 --- /dev/null +++ b/journal.go @@ -0,0 +1,253 @@ +package ipld_eth_statedb + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) + +// journalEntry is a modification entry in the state change journal that can be +// reverted on demand. +type journalEntry interface { + // revert undoes the changes introduced by this journal entry. + revert(*StateDB) + + // dirtied returns the Ethereum address modified by this journal entry. + dirtied() *common.Address +} + +// journal contains the list of state modifications applied since the last state +// commit. These are tracked to be able to be reverted in the case of an execution +// exception or request for reversal. +type journal struct { + entries []journalEntry // Current changes tracked by the journal + dirties map[common.Address]int // Dirty accounts and the number of changes +} + +// newJournal creates a new initialized journal. +func newJournal() *journal { + return &journal{ + dirties: make(map[common.Address]int), + } +} + +// append inserts a new modification entry to the end of the change journal. +func (j *journal) append(entry journalEntry) { + j.entries = append(j.entries, entry) + if addr := entry.dirtied(); addr != nil { + j.dirties[*addr]++ + } +} + +// revert undoes a batch of journalled modifications along with any reverted +// dirty handling too. +func (j *journal) revert(statedb *StateDB, snapshot int) { + for i := len(j.entries) - 1; i >= snapshot; i-- { + // Undo the changes made by the operation + j.entries[i].revert(statedb) + + // Drop any dirty tracking induced by the change + if addr := j.entries[i].dirtied(); addr != nil { + if j.dirties[*addr]--; j.dirties[*addr] == 0 { + delete(j.dirties, *addr) + } + } + } + j.entries = j.entries[:snapshot] +} + +// dirty explicitly sets an address to dirty, even if the change entries would +// otherwise suggest it as clean. This method is an ugly hack to handle the RIPEMD +// precompile consensus exception. +func (j *journal) dirty(addr common.Address) { + j.dirties[addr]++ +} + +// length returns the current number of entries in the journal. +func (j *journal) length() int { + return len(j.entries) +} + +type ( + // Changes to the account trie. + createObjectChange struct { + account *common.Address + } + resetObjectChange struct { + prev *stateObject + prevdestruct bool + } + suicideChange struct { + account *common.Address + prev bool // whether account had already suicided + prevbalance *big.Int + } + + // Changes to individual accounts. + balanceChange struct { + account *common.Address + prev *big.Int + } + nonceChange struct { + account *common.Address + prev uint64 + } + storageChange struct { + account *common.Address + key, prevalue common.Hash + } + codeChange struct { + account *common.Address + prevcode, prevhash []byte + } + + // Changes to other state values. + refundChange struct { + prev uint64 + } + addLogChange struct { + txhash common.Hash + } + addPreimageChange struct { + hash common.Hash + } + touchChange struct { + account *common.Address + } + // Changes to the access list + accessListAddAccountChange struct { + address *common.Address + } + accessListAddSlotChange struct { + address *common.Address + slot *common.Hash + } +) + +func (ch createObjectChange) revert(s *StateDB) { + delete(s.stateObjects, *ch.account) + delete(s.stateObjectsDirty, *ch.account) +} + +func (ch createObjectChange) dirtied() *common.Address { + return ch.account +} + +func (ch resetObjectChange) revert(s *StateDB) { + s.setStateObject(ch.prev) + if !ch.prevdestruct && s.snap != nil { + delete(s.snapDestructs, ch.prev.addrHash) + } +} + +func (ch resetObjectChange) dirtied() *common.Address { + return nil +} + +func (ch suicideChange) revert(s *StateDB) { + obj := s.getStateObject(*ch.account) + if obj != nil { + obj.suicided = ch.prev + obj.setBalance(ch.prevbalance) + } +} + +func (ch suicideChange) dirtied() *common.Address { + return ch.account +} + +var ripemd = common.HexToAddress("0000000000000000000000000000000000000003") + +func (ch touchChange) revert(s *StateDB) { +} + +func (ch touchChange) dirtied() *common.Address { + return ch.account +} + +func (ch balanceChange) revert(s *StateDB) { + s.getStateObject(*ch.account).setBalance(ch.prev) +} + +func (ch balanceChange) dirtied() *common.Address { + return ch.account +} + +func (ch nonceChange) revert(s *StateDB) { + s.getStateObject(*ch.account).setNonce(ch.prev) +} + +func (ch nonceChange) dirtied() *common.Address { + return ch.account +} + +func (ch codeChange) revert(s *StateDB) { + s.getStateObject(*ch.account).setCode(common.BytesToHash(ch.prevhash), ch.prevcode) +} + +func (ch codeChange) dirtied() *common.Address { + return ch.account +} + +func (ch storageChange) revert(s *StateDB) { + s.getStateObject(*ch.account).setState(ch.key, ch.prevalue) +} + +func (ch storageChange) dirtied() *common.Address { + return ch.account +} + +func (ch refundChange) revert(s *StateDB) { + s.refund = ch.prev +} + +func (ch refundChange) dirtied() *common.Address { + return nil +} + +func (ch addLogChange) revert(s *StateDB) { + logs := s.logs[ch.txhash] + if len(logs) == 1 { + delete(s.logs, ch.txhash) + } else { + s.logs[ch.txhash] = logs[:len(logs)-1] + } + s.logSize-- +} + +func (ch addLogChange) dirtied() *common.Address { + return nil +} + +func (ch addPreimageChange) revert(s *StateDB) { + delete(s.preimages, ch.hash) +} + +func (ch addPreimageChange) dirtied() *common.Address { + return nil +} + +func (ch accessListAddAccountChange) revert(s *StateDB) { + /* + One important invariant here, is that whenever a (addr, slot) is added, if the + addr is not already present, the add causes two journal entries: + - one for the address, + - one for the (address,slot) + Therefore, when unrolling the change, we can always blindly delete the + (addr) at this point, since no storage adds can remain when come upon + a single (addr) change. + */ + s.accessList.DeleteAddress(*ch.address) +} + +func (ch accessListAddAccountChange) dirtied() *common.Address { + return nil +} + +func (ch accessListAddSlotChange) revert(s *StateDB) { + s.accessList.DeleteSlot(*ch.address, *ch.slot) +} + +func (ch accessListAddSlotChange) dirtied() *common.Address { + return nil +} diff --git a/state_object.go b/state_object.go new file mode 100644 index 0000000..7e58b2d --- /dev/null +++ b/state_object.go @@ -0,0 +1,521 @@ +package ipld_eth_statedb + +import ( + "bytes" + "fmt" + "io" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/state" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +var emptyCodeHash = crypto.Keccak256(nil) + +type Code []byte + +func (c Code) String() string { + return string(c) //strings.Join(Disassemble(c), " ") +} + +type Storage map[common.Hash]common.Hash + +func (s Storage) String() (str string) { + for key, value := range s { + str += fmt.Sprintf("%X : %X\n", key, value) + } + + return +} + +func (s Storage) Copy() Storage { + cpy := make(Storage, len(s)) + for key, value := range s { + cpy[key] = value + } + + return cpy +} + +// stateObject represents an Ethereum account which is being modified. +// +// The usage pattern is as follows: +// First you need to obtain a state object. +// Account values can be accessed and modified through the object. +// Finally, call CommitTrie to write the modified storage trie into a database. +type stateObject struct { + address common.Address + addrHash common.Hash // hash of ethereum address of the account + data types.StateAccount + db *StateDB + + // DB error. + // State objects are used by the consensus core and VM which are + // unable to deal with database-level errors. Any error that occurs + // during a database read is memoized here and will eventually be returned + // by StateDB.Commit. + dbErr error + + // Write caches. + trie state.Trie // storage trie, which becomes non-nil on first access + code Code // contract bytecode, which gets set when code is loaded + + originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction + pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block + dirtyStorage Storage // Storage entries that have been modified in the current transaction execution + fakeStorage Storage // Fake storage which constructed by caller for debugging purpose. + + // Cache flags. + // When an object is marked suicided it will be delete from the trie + // during the "update" phase of the state transition. + dirtyCode bool // true if the code was updated + suicided bool + deleted bool +} + +// empty returns whether the account is considered empty. +func (s *stateObject) empty() bool { + return s.data.Nonce == 0 && s.data.Balance.Sign() == 0 && bytes.Equal(s.data.CodeHash, emptyCodeHash) +} + +// newObject creates a state object. +func newObject(db *StateDB, address common.Address, data types.StateAccount) *stateObject { + if data.Balance == nil { + data.Balance = new(big.Int) + } + if data.CodeHash == nil { + data.CodeHash = emptyCodeHash + } + if data.Root == (common.Hash{}) { + data.Root = emptyRoot + } + return &stateObject{ + db: db, + address: address, + addrHash: crypto.Keccak256Hash(address[:]), + data: data, + originStorage: make(Storage), + pendingStorage: make(Storage), + dirtyStorage: make(Storage), + } +} + +// EncodeRLP implements rlp.Encoder. +func (s *stateObject) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, &s.data) +} + +// setError remembers the first non-nil error it is called with. +func (s *stateObject) setError(err error) { + if s.dbErr == nil { + s.dbErr = err + } +} + +func (s *stateObject) markSuicided() { + s.suicided = true +} + +func (s *stateObject) touch() { + s.db.journal.append(touchChange{ + account: &s.address, + }) + if s.address == ripemd { + // Explicitly put it in the dirty-cache, which is otherwise generated from + // flattened journals. + s.db.journal.dirty(s.address) + } +} + +func (s *stateObject) getTrie(db Database) state.Trie { + if s.trie == nil { + // Try fetching from prefetcher first + // We don't prefetch empty tries + if s.data.Root != emptyRoot && s.db.prefetcher != nil { + // When the miner is creating the pending state, there is no + // prefetcher + s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root) + } + if s.trie == nil { + var err error + s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root) + if err != nil { + s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{}) + s.setError(fmt.Errorf("can't create storage trie: %v", err)) + } + } + } + return s.trie +} + +// GetState retrieves a value from the account storage trie. +func (s *stateObject) GetState(db Database, key common.Hash) common.Hash { + // If the fake storage is set, only lookup the state here(in the debugging mode) + if s.fakeStorage != nil { + return s.fakeStorage[key] + } + // If we have a dirty value for this state entry, return it + value, dirty := s.dirtyStorage[key] + if dirty { + return value + } + // Otherwise return the entry's original value + return s.GetCommittedState(db, key) +} + +// GetCommittedState retrieves a value from the committed account storage trie. +func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Hash { + // If the fake storage is set, only lookup the state here(in the debugging mode) + if s.fakeStorage != nil { + return s.fakeStorage[key] + } + // If we have a pending write or clean cached, return that + if value, pending := s.pendingStorage[key]; pending { + return value + } + if value, cached := s.originStorage[key]; cached { + return value + } + // If no live objects are available, attempt to use snapshots + var ( + enc []byte + err error + ) + if s.db.snap != nil { + // If the object was destructed in *this* block (and potentially resurrected), + // the storage has been cleared out, and we should *not* consult the previous + // snapshot about any storage values. The only possible alternatives are: + // 1) resurrect happened, and new slot values were set -- those should + // have been handles via pendingStorage above. + // 2) we don't have new values, and can deliver empty response back + if _, destructed := s.db.snapDestructs[s.addrHash]; destructed { + return common.Hash{} + } + start := time.Now() + enc, err = s.db.snap.Storage(s.addrHash, crypto.Keccak256Hash(key.Bytes())) + if metrics.EnabledExpensive { + s.db.SnapshotStorageReads += time.Since(start) + } + } + // If the snapshot is unavailable or reading from it fails, load from the database. + if s.db.snap == nil || err != nil { + start := time.Now() + enc, err = s.getTrie(db).TryGet(key.Bytes()) + if metrics.EnabledExpensive { + s.db.StorageReads += time.Since(start) + } + if err != nil { + s.setError(err) + return common.Hash{} + } + } + var value common.Hash + if len(enc) > 0 { + _, content, _, err := rlp.Split(enc) + if err != nil { + s.setError(err) + } + value.SetBytes(content) + } + s.originStorage[key] = value + return value +} + +// SetState updates a value in account storage. +func (s *stateObject) SetState(db Database, key, value common.Hash) { + // If the fake storage is set, put the temporary state update here. + if s.fakeStorage != nil { + s.fakeStorage[key] = value + return + } + // If the new value is the same as old, don't set + prev := s.GetState(db, key) + if prev == value { + return + } + // New value is different, update and journal the change + s.db.journal.append(storageChange{ + account: &s.address, + key: key, + prevalue: prev, + }) + s.setState(key, value) +} + +// SetStorage replaces the entire state storage with the given one. +// +// After this function is called, all original state will be ignored and state +// lookup only happens in the fake state storage. +// +// Note this function should only be used for debugging purpose. +func (s *stateObject) SetStorage(storage map[common.Hash]common.Hash) { + // Allocate fake storage if it's nil. + if s.fakeStorage == nil { + s.fakeStorage = make(Storage) + } + for key, value := range storage { + s.fakeStorage[key] = value + } + // Don't bother journal since this function should only be used for + // debugging and the `fake` storage won't be committed to database. +} + +func (s *stateObject) setState(key, value common.Hash) { + s.dirtyStorage[key] = value +} + +// finalise moves all dirty storage slots into the pending area to be hashed or +// committed later. It is invoked at the end of every transaction. +func (s *stateObject) finalise(prefetch bool) { + slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) + for key, value := range s.dirtyStorage { + s.pendingStorage[key] = value + if value != s.originStorage[key] { + slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure + } + } + if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { + s.db.prefetcher.prefetch(s.addrHash, s.data.Root, slotsToPrefetch) + } + if len(s.dirtyStorage) > 0 { + s.dirtyStorage = make(Storage) + } +} + +// updateTrie writes cached storage modifications into the object's storage trie. +// It will return nil if the trie has not been loaded and no changes have been made +func (s *stateObject) updateTrie(db Database) state.Trie { + // Make sure all dirty slots are finalized into the pending storage area + s.finalise(false) // Don't prefetch anymore, pull directly if need be + if len(s.pendingStorage) == 0 { + return s.trie + } + // Track the amount of time wasted on updating the storage trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) + } + // The snapshot storage map for the object + var storage map[common.Hash][]byte + // Insert all the pending updates into the trie + tr := s.getTrie(db) + hasher := s.db.hasher + + usedStorage := make([][]byte, 0, len(s.pendingStorage)) + for key, value := range s.pendingStorage { + // Skip noop changes, persist actual changes + if value == s.originStorage[key] { + continue + } + s.originStorage[key] = value + + var v []byte + if (value == common.Hash{}) { + s.setError(tr.TryDelete(key[:])) + s.db.StorageDeleted += 1 + } else { + // Encoding []byte cannot fail, ok to ignore the error. + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) + s.db.StorageUpdated += 1 + } + // If state snapshotting is active, cache the data til commit + if s.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = s.db.snapStorage[s.addrHash]; storage == nil { + storage = make(map[common.Hash][]byte) + s.db.snapStorage[s.addrHash] = storage + } + } + storage[crypto.HashData(hasher, key[:])] = v // v will be nil if it's deleted + } + usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure + } + if s.db.prefetcher != nil { + s.db.prefetcher.used(s.addrHash, s.data.Root, usedStorage) + } + if len(s.pendingStorage) > 0 { + s.pendingStorage = make(Storage) + } + return tr +} + +// UpdateRoot sets the trie root to the current root hash of +func (s *stateObject) updateRoot(db Database) { + // If nothing changed, don't bother with hashing anything + if s.updateTrie(db) == nil { + return + } + // Track the amount of time wasted on hashing the storage trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.db.StorageHashes += time.Since(start) }(time.Now()) + } + s.data.Root = s.trie.Hash() +} + +// CommitTrie the storage trie of the object to db. +// This updates the trie root. +func (s *stateObject) CommitTrie(db Database) (*trie.NodeSet, error) { + // If nothing changed, don't bother with hashing anything + if s.updateTrie(db) == nil { + return nil, nil + } + if s.dbErr != nil { + return nil, s.dbErr + } + // Track the amount of time wasted on committing the storage trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.db.StorageCommits += time.Since(start) }(time.Now()) + } + root, nodes, err := s.trie.Commit(false) + if err == nil { + s.data.Root = root + } + return nodes, err +} + +// AddBalance adds amount to s's balance. +// It is used to add funds to the destination account of a transfer. +func (s *stateObject) AddBalance(amount *big.Int) { + // EIP161: We must check emptiness for the objects such that the account + // clearing (0,0,0 objects) can take effect. + if amount.Sign() == 0 { + if s.empty() { + s.touch() + } + return + } + s.SetBalance(new(big.Int).Add(s.Balance(), amount)) +} + +// SubBalance removes amount from s's balance. +// It is used to remove funds from the origin account of a transfer. +func (s *stateObject) SubBalance(amount *big.Int) { + if amount.Sign() == 0 { + return + } + s.SetBalance(new(big.Int).Sub(s.Balance(), amount)) +} + +func (s *stateObject) SetBalance(amount *big.Int) { + s.db.journal.append(balanceChange{ + account: &s.address, + prev: new(big.Int).Set(s.data.Balance), + }) + s.setBalance(amount) +} + +func (s *stateObject) setBalance(amount *big.Int) { + s.data.Balance = amount +} + +func (s *stateObject) deepCopy(db *StateDB) *stateObject { + stateObject := newObject(db, s.address, s.data) + if s.trie != nil { + stateObject.trie = db.db.CopyTrie(s.trie) + } + stateObject.code = s.code + stateObject.dirtyStorage = s.dirtyStorage.Copy() + stateObject.originStorage = s.originStorage.Copy() + stateObject.pendingStorage = s.pendingStorage.Copy() + stateObject.suicided = s.suicided + stateObject.dirtyCode = s.dirtyCode + stateObject.deleted = s.deleted + return stateObject +} + +// +// Attribute accessors +// + +// Returns the address of the contract/account +func (s *stateObject) Address() common.Address { + return s.address +} + +// Code returns the contract code associated with this object, if any. +func (s *stateObject) Code(db Database) []byte { + if s.code != nil { + return s.code + } + if bytes.Equal(s.CodeHash(), emptyCodeHash) { + return nil + } + code, err := db.ContractCode(s.addrHash, common.BytesToHash(s.CodeHash())) + if err != nil { + s.setError(fmt.Errorf("can't load code hash %x: %v", s.CodeHash(), err)) + } + s.code = code + return code +} + +// CodeSize returns the size of the contract code associated with this object, +// or zero if none. This method is an almost mirror of Code, but uses a cache +// inside the database to avoid loading codes seen recently. +func (s *stateObject) CodeSize(db Database) int { + if s.code != nil { + return len(s.code) + } + if bytes.Equal(s.CodeHash(), emptyCodeHash) { + return 0 + } + size, err := db.ContractCodeSize(s.addrHash, common.BytesToHash(s.CodeHash())) + if err != nil { + s.setError(fmt.Errorf("can't load code size %x: %v", s.CodeHash(), err)) + } + return size +} + +func (s *stateObject) SetCode(codeHash common.Hash, code []byte) { + prevcode := s.Code(s.db.db) + s.db.journal.append(codeChange{ + account: &s.address, + prevhash: s.CodeHash(), + prevcode: prevcode, + }) + s.setCode(codeHash, code) +} + +func (s *stateObject) setCode(codeHash common.Hash, code []byte) { + s.code = code + s.data.CodeHash = codeHash[:] + s.dirtyCode = true +} + +func (s *stateObject) SetNonce(nonce uint64) { + s.db.journal.append(nonceChange{ + account: &s.address, + prev: s.data.Nonce, + }) + s.setNonce(nonce) +} + +func (s *stateObject) setNonce(nonce uint64) { + s.data.Nonce = nonce +} + +func (s *stateObject) CodeHash() []byte { + return s.data.CodeHash +} + +func (s *stateObject) Balance() *big.Int { + return s.data.Balance +} + +func (s *stateObject) Nonce() uint64 { + return s.data.Nonce +} + +// Never called, but must be present to allow stateObject to be used +// as a vm.Account interface that also satisfies the vm.ContractRef +// interface. Interfaces are awesome. +func (s *stateObject) Value() *big.Int { + panic("Value on stateObject should never be called") +} diff --git a/statedb.go b/statedb.go new file mode 100644 index 0000000..4698f51 --- /dev/null +++ b/statedb.go @@ -0,0 +1,637 @@ +package ipld_eth_statedb + +import ( + "fmt" + "math/big" + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/snapshot" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +/* +The portions of the EVM we want to leverage only use the following methods: + +GetBalance +Snapshot +Exist +CreateAccount +SubBalance +AddBalance +GetCode +GetCodeHash +RevertToSnapshot +GetNonce +SetNonce +AddAddressToAccessList +SetCode + +The rest can be left with panics for now +*/ + +var _ vm.StateDB = &StateDB{} + +type revision struct { + id int + journalIndex int +} + +var ( + // emptyRoot is the known root hash of an empty trie. + emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") +) + +// StateDB structs within the ethereum protocol are used to store anything +// within the merkle trie. StateDBs take care of caching and storing +// nested states. It's the general query interface to retrieve: +// * Contracts +// * Accounts +type StateDB struct { + db Database + prefetcher *triePrefetcher + trie state.Trie + hasher crypto.KeccakState + + // originalRoot is the pre-state root, before any changes were made. + // It will be updated when the Commit is called. + originalRoot common.Hash + + snaps *snapshot.Tree + snap snapshot.Snapshot + snapDestructs map[common.Hash]struct{} + snapAccounts map[common.Hash][]byte + snapStorage map[common.Hash]map[common.Hash][]byte + + // This map holds 'live' objects, which will get modified while processing a state transition. + stateObjects map[common.Address]*stateObject + stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie + stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + + // DB error. + // State objects are used by the consensus core and VM which are + // unable to deal with database-level errors. Any error that occurs + // during a database read is memoized here and will eventually be returned + // by StateDB.Commit. + dbErr error + + // The refund counter, also used by state transitioning. + refund uint64 + + thash common.Hash + txIndex int + logs map[common.Hash][]*types.Log + logSize uint + + preimages map[common.Hash][]byte + + // Per-transaction access list + accessList *accessList + + // Journal of state modifications. This is the backbone of + // Snapshot and RevertToSnapshot. + journal *journal + validRevisions []revision + nextRevisionId int + + // Measurements gathered during execution for debugging purposes + AccountReads time.Duration + AccountHashes time.Duration + AccountUpdates time.Duration + AccountCommits time.Duration + StorageReads time.Duration + StorageHashes time.Duration + StorageUpdates time.Duration + StorageCommits time.Duration + SnapshotAccountReads time.Duration + SnapshotStorageReads time.Duration + SnapshotCommits time.Duration + + AccountUpdated int + StorageUpdated int + AccountDeleted int + StorageDeleted int +} + +// New creates a new state from a given trie. +func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { + tr, err := db.OpenTrie(root) + if err != nil { + return nil, err + } + sdb := &StateDB{ + db: db, + trie: tr, + originalRoot: root, + snaps: snaps, + stateObjects: make(map[common.Address]*stateObject), + stateObjectsPending: make(map[common.Address]struct{}), + stateObjectsDirty: make(map[common.Address]struct{}), + logs: make(map[common.Hash][]*types.Log), + preimages: make(map[common.Hash][]byte), + journal: newJournal(), + accessList: newAccessList(), + hasher: crypto.NewKeccakState(), + } + if sdb.snaps != nil { + if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { + sdb.snapDestructs = make(map[common.Hash]struct{}) + sdb.snapAccounts = make(map[common.Hash][]byte) + sdb.snapStorage = make(map[common.Hash]map[common.Hash][]byte) + } + } + return sdb, nil +} + +// setError remembers the first non-nil error it is called with. +func (s *StateDB) setError(err error) { + if s.dbErr == nil { + s.dbErr = err + } +} + +func (s *StateDB) AddLog(log *types.Log) { + s.journal.append(addLogChange{txhash: s.thash}) + + log.TxHash = s.thash + log.TxIndex = uint(s.txIndex) + log.Index = s.logSize + s.logs[s.thash] = append(s.logs[s.thash], log) + s.logSize++ +} + +// AddPreimage records a SHA3 preimage seen by the VM. +func (s *StateDB) AddPreimage(hash common.Hash, preimage []byte) { + if _, ok := s.preimages[hash]; !ok { + s.journal.append(addPreimageChange{hash: hash}) + pi := make([]byte, len(preimage)) + copy(pi, preimage) + s.preimages[hash] = pi + } +} + +// AddRefund adds gas to the refund counter +func (s *StateDB) AddRefund(gas uint64) { + s.journal.append(refundChange{prev: s.refund}) + s.refund += gas +} + +// SubRefund removes gas from the refund counter. +// This method will panic if the refund counter goes below zero +func (s *StateDB) SubRefund(gas uint64) { + s.journal.append(refundChange{prev: s.refund}) + if gas > s.refund { + panic(fmt.Sprintf("Refund counter below zero (gas: %d > refund: %d)", gas, s.refund)) + } + s.refund -= gas +} + +// Exist reports whether the given account address exists in the state. +// Notably this also returns true for suicided accounts. +func (s *StateDB) Exist(addr common.Address) bool { + return s.getStateObject(addr) != nil +} + +// Empty returns whether the state object is either non-existent +// or empty according to the EIP161 specification (balance = nonce = code = 0) +func (s *StateDB) Empty(addr common.Address) bool { + so := s.getStateObject(addr) + return so == nil || so.empty() +} + +// GetBalance retrieves the balance from the given address or 0 if object not found +func (s *StateDB) GetBalance(addr common.Address) *big.Int { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.Balance() + } + return common.Big0 +} + +func (s *StateDB) GetNonce(addr common.Address) uint64 { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.Nonce() + } + + return 0 +} + +func (s *StateDB) GetCode(addr common.Address) []byte { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.Code(s.db) + } + return nil +} + +func (s *StateDB) GetCodeSize(addr common.Address) int { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.CodeSize(s.db) + } + return 0 +} + +func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { + stateObject := s.getStateObject(addr) + if stateObject == nil { + return common.Hash{} + } + return common.BytesToHash(stateObject.CodeHash()) +} + +// GetState retrieves a value from the given account's storage trie. +func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.GetState(s.db, hash) + } + return common.Hash{} +} + +// GetCommittedState retrieves a value from the given account's committed storage trie. +func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.GetCommittedState(s.db, hash) + } + return common.Hash{} +} + +func (s *StateDB) HasSuicided(addr common.Address) bool { + stateObject := s.getStateObject(addr) + if stateObject != nil { + return stateObject.suicided + } + return false +} + +/* + * SETTERS + */ + +// AddBalance adds amount to the account associated with addr. +func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.AddBalance(amount) + } +} + +// SubBalance subtracts amount from the account associated with addr. +func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.SubBalance(amount) + } +} + +func (s *StateDB) SetBalance(addr common.Address, amount *big.Int) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.SetBalance(amount) + } +} + +func (s *StateDB) SetNonce(addr common.Address, nonce uint64) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.SetNonce(nonce) + } +} + +func (s *StateDB) SetCode(addr common.Address, code []byte) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.SetCode(crypto.Keccak256Hash(code), code) + } +} + +func (s *StateDB) SetState(addr common.Address, key, value common.Hash) { + stateObject := s.getOrNewStateObject(addr) + if stateObject != nil { + stateObject.SetState(s.db, key, value) + } +} + +// Suicide marks the given account as suicided. +// This clears the account balance. +// +// The account's state object is still available until the state is committed, +// getStateObject will return a non-nil account after Suicide. +func (s *StateDB) Suicide(addr common.Address) bool { + stateObject := s.getStateObject(addr) + if stateObject == nil { + return false + } + s.journal.append(suicideChange{ + account: &addr, + prev: stateObject.suicided, + prevbalance: new(big.Int).Set(stateObject.Balance()), + }) + stateObject.markSuicided() + stateObject.data.Balance = new(big.Int) + + return true +} + +// +// Setting, updating & deleting state object methods. +// + +// updateStateObject writes the given object to the trie. +// TODO: +func (s *StateDB) updateStateObject(obj *stateObject) { + // Track the amount of time wasted on updating the account from the trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) + } + // Encode the account and update the account trie + addr := obj.Address() + if err := s.trie.TryUpdateAccount(addr[:], &obj.data); err != nil { + s.setError(fmt.Errorf("updateStateObject (%x) error: %v", addr[:], err)) + } + + // If state snapshotting is active, cache the data til commit. Note, this + // update mechanism is not symmetric to the deletion, because whereas it is + // enough to track account updates at commit time, deletions need tracking + // at transaction boundary level to ensure we capture state clearing. + if s.snap != nil { + s.snapAccounts[obj.addrHash] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash) + } +} + +// deleteStateObject removes the given object from the state trie. +// TODO: +func (s *StateDB) deleteStateObject(obj *stateObject) { + // Track the amount of time wasted on deleting the account from the trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now()) + } + // Delete the account from the trie + addr := obj.Address() + if err := s.trie.TryDeleteAccount(addr[:]); err != nil { + s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err)) + } +} + +// getStateObject retrieves a state object given by the address, returning nil if +// the object is not found or was deleted in this execution context. If you need +// to differentiate between non-existent/just-deleted, use getDeletedStateObject. +func (s *StateDB) getStateObject(addr common.Address) *stateObject { + if obj := s.getDeletedStateObject(addr); obj != nil && !obj.deleted { + return obj + } + return nil +} + +// getDeletedStateObject is similar to getStateObject, but instead of returning +// nil for a deleted state object, it returns the actual object with the deleted +// flag set. This is needed by the state journal to revert to the correct s- +// destructed object instead of wiping all knowledge about the state object. +// TODO: +func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject { + // Prefer live objects if any is available + if obj := s.stateObjects[addr]; obj != nil { + return obj + } + // If no live objects are available, attempt to use snapshots + var data *types.StateAccount + if s.snap != nil { + start := time.Now() + acc, err := s.snap.Account(crypto.HashData(s.hasher, addr.Bytes())) + if metrics.EnabledExpensive { + s.SnapshotAccountReads += time.Since(start) + } + if err == nil { + if acc == nil { + return nil + } + data = &types.StateAccount{ + Nonce: acc.Nonce, + Balance: acc.Balance, + CodeHash: acc.CodeHash, + Root: common.BytesToHash(acc.Root), + } + if len(data.CodeHash) == 0 { + data.CodeHash = emptyCodeHash + } + if data.Root == (common.Hash{}) { + data.Root = emptyRoot + } + } + } + // If snapshot unavailable or reading from it failed, load from the database + if data == nil { + start := time.Now() + var err error + data, err = s.trie.TryGetAccount(addr.Bytes()) + if metrics.EnabledExpensive { + s.AccountReads += time.Since(start) + } + if err != nil { + s.setError(fmt.Errorf("getDeleteStateObject (%x) error: %w", addr.Bytes(), err)) + return nil + } + if data == nil { + return nil + } + } + // Insert into the live set + obj := newObject(s, addr, *data) + s.setStateObject(obj) + return obj +} + +func (s *StateDB) setStateObject(object *stateObject) { + s.stateObjects[object.Address()] = object +} + +// getOrNewStateObject retrieves a state object or create a new state object if nil. +func (s *StateDB) getOrNewStateObject(addr common.Address) *stateObject { + stateObject := s.getStateObject(addr) + if stateObject == nil { + stateObject, _ = s.createObject(addr) + } + return stateObject +} + +// createObject creates a new state object. If there is an existing account with +// the given address, it is overwritten and returned as the second return value. +func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject) { + prev = s.getDeletedStateObject(addr) // Note, prev might have been deleted, we need that! + + var prevdestruct bool + if s.snap != nil && prev != nil { + _, prevdestruct = s.snapDestructs[prev.addrHash] + if !prevdestruct { + s.snapDestructs[prev.addrHash] = struct{}{} + } + } + newobj = newObject(s, addr, types.StateAccount{}) + if prev == nil { + s.journal.append(createObjectChange{account: &addr}) + } else { + s.journal.append(resetObjectChange{prev: prev, prevdestruct: prevdestruct}) + } + s.setStateObject(newobj) + if prev != nil && !prev.deleted { + return newobj, prev + } + return newobj, nil +} + +// CreateAccount explicitly creates a state object. If a state object with the address +// already exists the balance is carried over to the new account. +// +// CreateAccount is called during the EVM CREATE operation. The situation might arise that +// a contract does the following: +// +// 1. sends funds to sha(account ++ (nonce + 1)) +// 2. tx_create(sha(account ++ nonce)) (note that this gets the address of 1) +// +// Carrying over the balance ensures that Ether doesn't disappear. +func (s *StateDB) CreateAccount(addr common.Address) { + newObj, prev := s.createObject(addr) + if prev != nil { + newObj.setBalance(prev.data.Balance) + } +} + +func (db *StateDB) ForEachStorage(addr common.Address, cb func(key, value common.Hash) bool) error { + so := db.getStateObject(addr) + if so == nil { + return nil + } + it := trie.NewIterator(so.getTrie(db.db).NodeIterator(nil)) + + for it.Next() { + key := common.BytesToHash(db.trie.GetKey(it.Key)) + if value, dirty := so.dirtyStorage[key]; dirty { + if !cb(key, value) { + return nil + } + continue + } + + if len(it.Value) > 0 { + _, content, _, err := rlp.Split(it.Value) + if err != nil { + return err + } + if !cb(key, common.BytesToHash(content)) { + return nil + } + } + } + return nil +} + +// Snapshot returns an identifier for the current revision of the state. +func (s *StateDB) Snapshot() int { + id := s.nextRevisionId + s.nextRevisionId++ + s.validRevisions = append(s.validRevisions, revision{id, s.journal.length()}) + return id +} + +// RevertToSnapshot reverts all state changes made since the given revision. +func (s *StateDB) RevertToSnapshot(revid int) { + // Find the snapshot in the stack of valid snapshots. + idx := sort.Search(len(s.validRevisions), func(i int) bool { + return s.validRevisions[i].id >= revid + }) + if idx == len(s.validRevisions) || s.validRevisions[idx].id != revid { + panic(fmt.Errorf("revision id %v cannot be reverted", revid)) + } + snapshot := s.validRevisions[idx].journalIndex + + // Replay the journal to undo changes and remove invalidated snapshots + s.journal.revert(s, snapshot) + s.validRevisions = s.validRevisions[:idx] +} + +// GetRefund returns the current value of the refund counter. +func (s *StateDB) GetRefund() uint64 { + return s.refund +} + +func (s *StateDB) clearJournalAndRefund() { + if len(s.journal.entries) > 0 { + s.journal = newJournal() + s.refund = 0 + } + s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entries +} + +// PrepareAccessList handles the preparatory steps for executing a state transition with +// regards to both EIP-2929 and EIP-2930: +// +// - Add sender to access list (2929) +// - Add destination to access list (2929) +// - Add precompiles to access list (2929) +// - Add the contents of the optional tx access list (2930) +// +// This method should only be called if Berlin/2929+2930 is applicable at the current number. +func (s *StateDB) PrepareAccessList(sender common.Address, dst *common.Address, precompiles []common.Address, list types.AccessList) { + // Clear out any leftover from previous executions + s.accessList = newAccessList() + + s.AddAddressToAccessList(sender) + if dst != nil { + s.AddAddressToAccessList(*dst) + // If it's a create-tx, the destination will be added inside evm.create + } + for _, addr := range precompiles { + s.AddAddressToAccessList(addr) + } + for _, el := range list { + s.AddAddressToAccessList(el.Address) + for _, key := range el.StorageKeys { + s.AddSlotToAccessList(el.Address, key) + } + } +} + +// AddAddressToAccessList adds the given address to the access list +func (s *StateDB) AddAddressToAccessList(addr common.Address) { + if s.accessList.AddAddress(addr) { + s.journal.append(accessListAddAccountChange{&addr}) + } +} + +// AddSlotToAccessList adds the given (address, slot)-tuple to the access list +func (s *StateDB) AddSlotToAccessList(addr common.Address, slot common.Hash) { + addrMod, slotMod := s.accessList.AddSlot(addr, slot) + if addrMod { + // In practice, this should not happen, since there is no way to enter the + // scope of 'address' without having the 'address' become already added + // to the access list (via call-variant, create, etc). + // Better safe than sorry, though + s.journal.append(accessListAddAccountChange{&addr}) + } + if slotMod { + s.journal.append(accessListAddSlotChange{ + address: &addr, + slot: &slot, + }) + } +} + +// AddressInAccessList returns true if the given address is in the access list. +func (s *StateDB) AddressInAccessList(addr common.Address) bool { + return s.accessList.ContainsAddress(addr) +} + +// SlotInAccessList returns true if the given (address, slot)-tuple is in the access list. +func (s *StateDB) SlotInAccessList(addr common.Address, slot common.Hash) (addressPresent bool, slotPresent bool) { + return s.accessList.Contains(addr, slot) +} diff --git a/trie_prefetcher.go b/trie_prefetcher.go new file mode 100644 index 0000000..52700ae --- /dev/null +++ b/trie_prefetcher.go @@ -0,0 +1,339 @@ +package ipld_eth_statedb + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core/state" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. + triePrefetchMetricsPrefix = "trie/prefetch/" +) + +// triePrefetcher is an active prefetcher, which receives accounts or storage +// items and does trie-loading of them. The goal is to get as much useful content +// into the caches as possible. +// +// Note, the prefetcher's API is not thread safe. +type triePrefetcher struct { + db state.Database // Database to fetch trie nodes through + root common.Hash // Root hash of the account trie for metrics + fetches map[string]state.Trie // Partially or fully fetcher tries + fetchers map[string]*subfetcher // Subfetchers for each trie + + deliveryMissMeter metrics.Meter + accountLoadMeter metrics.Meter + accountDupMeter metrics.Meter + accountSkipMeter metrics.Meter + accountWasteMeter metrics.Meter + storageLoadMeter metrics.Meter + storageDupMeter metrics.Meter + storageSkipMeter metrics.Meter + storageWasteMeter metrics.Meter +} + +func newTriePrefetcher(db state.Database, root common.Hash, namespace string) *triePrefetcher { + prefix := triePrefetchMetricsPrefix + namespace + p := &triePrefetcher{ + db: db, + root: root, + fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map + + deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), + accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), + accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil), + accountSkipMeter: metrics.GetOrRegisterMeter(prefix+"/account/skip", nil), + accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), + storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil), + storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), + storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), + storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + } + return p +} + +// close iterates over all the subfetchers, aborts any that were left spinning +// and reports the stats to the metrics subsystem. +func (p *triePrefetcher) close() { + for _, fetcher := range p.fetchers { + fetcher.abort() // safe to do multiple times + + if metrics.Enabled { + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) + } else { + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } + } + } + // Clear out all fetchers (will crash on a second call, deliberate) + p.fetchers = nil +} + +// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data +// already loaded will be copied over, but no goroutines will be started. This +// is mostly used in the miner which creates a copy of it's actively mutated +// state to be sealed while it may further mutate the state. +func (p *triePrefetcher) copy() *triePrefetcher { + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[string]state.Trie), // Active prefetchers use the fetches map + + deliveryMissMeter: p.deliveryMissMeter, + accountLoadMeter: p.accountLoadMeter, + accountDupMeter: p.accountDupMeter, + accountSkipMeter: p.accountSkipMeter, + accountWasteMeter: p.accountWasteMeter, + storageLoadMeter: p.storageLoadMeter, + storageDupMeter: p.storageDupMeter, + storageSkipMeter: p.storageSkipMeter, + storageWasteMeter: p.storageWasteMeter, + } + // If the prefetcher is already a copy, duplicate the data + if p.fetches != nil { + for root, fetch := range p.fetches { + copy.fetches[root] = p.db.CopyTrie(fetch) + } + return copy + } + // Otherwise we're copying an active fetcher, retrieve the current states + for id, fetcher := range p.fetchers { + copy.fetches[id] = fetcher.peek() + } + return copy +} + +// prefetch schedules a batch of trie items to prefetch. +func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]byte) { + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { + return + } + // Active fetcher, schedule the retrievals + id := p.trieID(owner, root) + fetcher := p.fetchers[id] + if fetcher == nil { + fetcher = newSubfetcher(p.db, owner, root) + p.fetchers[id] = fetcher + } + fetcher.schedule(keys) +} + +// trie returns the trie matching the root hash, or nil if the prefetcher doesn't +// have it. +func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) state.Trie { + // If the prefetcher is inactive, return from existing deep copies + id := p.trieID(owner, root) + if p.fetches != nil { + trie := p.fetches[id] + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return p.db.CopyTrie(trie) + } + // Otherwise the prefetcher is active, bail if no trie was prefetched for this root + fetcher := p.fetchers[id] + if fetcher == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + fetcher.abort() // safe to do multiple times + + trie := fetcher.peek() + if trie == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return trie +} + +// used marks a batch of state items used to allow creating statistics as to +// how useful or wasteful the prefetcher is. +func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { + if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { + fetcher.used = used + } +} + +// trieID returns an unique trie identifier consists the trie owner and root hash. +func (p *triePrefetcher) trieID(owner common.Hash, root common.Hash) string { + return string(append(owner.Bytes(), root.Bytes()...)) +} + +// subfetcher is a trie fetcher goroutine responsible for pulling entries for a +// single trie. It is spawned when a new root is encountered and lives until the +// main prefetcher is paused and either all requested items are processed or if +// the trie being worked on is retrieved from the prefetcher. +type subfetcher struct { + db state.Database // Database to load trie nodes through + owner common.Hash // Owner of the trie, usually account hash + root common.Hash // Root hash of the trie to prefetch + trie state.Trie // Trie being populated with nodes + + tasks [][]byte // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue + + wake chan struct{} // Wake channel if a new task is scheduled + stop chan struct{} // Channel to interrupt processing + term chan struct{} // Channel to signal interruption + copy chan chan state.Trie // Channel to request a copy of the current trie + + seen map[string]struct{} // Tracks the entries already loaded + dups int // Number of duplicate preload tasks + used [][]byte // Tracks the entries used in the end +} + +// newSubfetcher creates a goroutine to prefetch state items belonging to a +// particular root hash. +func newSubfetcher(db state.Database, owner common.Hash, root common.Hash) *subfetcher { + sf := &subfetcher{ + db: db, + owner: owner, + root: root, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan state.Trie), + seen: make(map[string]struct{}), + } + go sf.loop() + return sf +} + +// schedule adds a batch of trie keys to the queue to prefetch. +func (sf *subfetcher) schedule(keys [][]byte) { + // Append the tasks to the current queue + sf.lock.Lock() + sf.tasks = append(sf.tasks, keys...) + sf.lock.Unlock() + + // Notify the prefetcher, it's fine if it's already terminated + select { + case sf.wake <- struct{}{}: + default: + } +} + +// peek tries to retrieve a deep copy of the fetcher's trie in whatever form it +// is currently. +func (sf *subfetcher) peek() state.Trie { + ch := make(chan state.Trie) + select { + case sf.copy <- ch: + // Subfetcher still alive, return copy from it + return <-ch + + case <-sf.term: + // Subfetcher already terminated, return a copy directly + if sf.trie == nil { + return nil + } + return sf.db.CopyTrie(sf.trie) + } +} + +// abort interrupts the subfetcher immediately. It is safe to call abort multiple +// times but it is not thread safe. +func (sf *subfetcher) abort() { + select { + case <-sf.stop: + default: + close(sf.stop) + } + <-sf.term +} + +// loop waits for new tasks to be scheduled and keeps loading them until it runs +// out of tasks or its underlying trie is retrieved for committing. +func (sf *subfetcher) loop() { + // No matter how the loop stops, signal anyone waiting that it's terminated + defer close(sf.term) + + // Start by opening the trie and stop processing if it fails + if sf.owner == (common.Hash{}) { + trie, err := sf.db.OpenTrie(sf.root) + if err != nil { + log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) + return + } + sf.trie = trie + } else { + trie, err := sf.db.OpenStorageTrie(sf.owner, sf.root) + if err != nil { + log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) + return + } + sf.trie = trie + } + // Trie opened successfully, keep prefetching items + for { + select { + case <-sf.wake: + // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + sf.lock.Lock() + tasks := sf.tasks + sf.tasks = nil + sf.lock.Unlock() + + // Prefetch any tasks until the loop is interrupted + for i, task := range tasks { + select { + case <-sf.stop: + // If termination is requested, add any leftover back and return + sf.lock.Lock() + sf.tasks = append(sf.tasks, tasks[i:]...) + sf.lock.Unlock() + return + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + default: + // No termination request yet, prefetch the next entry + if _, ok := sf.seen[string(task)]; ok { + sf.dups++ + } else { + if len(task) == len(common.Address{}) { + sf.trie.TryGetAccount(task) + } else { + sf.trie.TryGet(task) + } + sf.seen[string(task)] = struct{}{} + } + } + } + + case ch := <-sf.copy: + // Somebody wants a copy of the current trie, grant them + ch <- sf.db.CopyTrie(sf.trie) + + case <-sf.stop: + // Termination is requested, abort and leave remaining tasks + return + } + } +}