Merge pull request #36 from vulcanize/use-async-iter

Use concurrent node iterator
This commit is contained in:
Ian Norden 2022-08-16 09:54:07 -05:00 committed by GitHub
commit aecb98129c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 345 additions and 66 deletions

View File

@ -59,7 +59,8 @@ It can operate at three levels:
}
func validateTrie() {
v, err := newValidator()
numWorkers := viper.GetUint("validator.workers")
v, err := newValidator(numWorkers)
if err != nil {
logWithCommand.Fatal(err)
}
@ -104,20 +105,20 @@ func validateTrie() {
logWithCommand.Debugf("groupcache stats %+v", stats)
}
func newValidator() (*validator.Validator, error) {
func newValidator(workers uint) (*validator.Validator, error) {
ipfsPath := viper.GetString("ipfs.path")
if ipfsPath == "" {
db, err := validator.NewDB()
if err != nil {
logWithCommand.Fatal(err)
}
return validator.NewPGIPFSValidator(db), nil
return validator.NewPGIPFSValidator(db, workers), nil
}
bs, err := validator.InitIPFSBlockService(ipfsPath)
if err != nil {
return nil, err
}
return validator.NewIPFSValidator(bs), nil
return validator.NewIPFSValidator(bs, workers), nil
}
func init() {
@ -128,10 +129,12 @@ func init() {
validateTrieCmd.PersistentFlags().String("storage-root", "", "Root of the storage trie we wish to validate; for storage validation")
validateTrieCmd.PersistentFlags().String("address", "", "Contract address for the storage trie we wish to validate; for storage validation")
validateTrieCmd.PersistentFlags().String("ipfs-path", "", "Path to IPFS repository; if provided operations move through the IPFS repo otherwise Postgres connection params are expected in the provided config")
validateTrieCmd.PersistentFlags().Int("workers", 4, "number of concurrent workers to use")
viper.BindPFlag("validator.stateRoot", validateTrieCmd.PersistentFlags().Lookup("state-root"))
viper.BindPFlag("validator.type", validateTrieCmd.PersistentFlags().Lookup("type"))
viper.BindPFlag("validator.storageRoot", validateTrieCmd.PersistentFlags().Lookup("storage-root"))
viper.BindPFlag("validator.address", validateTrieCmd.PersistentFlags().Lookup("address"))
viper.BindPFlag("validator.workers", validateTrieCmd.PersistentFlags().Lookup("workers"))
viper.BindPFlag("ipfs.path", validateTrieCmd.PersistentFlags().Lookup("ipfs-path"))
}

5
go.mod
View File

@ -118,6 +118,8 @@ require (
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/libp2p/go-addr-util v0.1.0 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
@ -203,6 +205,9 @@ require (
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/wI2L/jsondiff v0.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/vulcanize/go-eth-state-node-iterator v1.1.4
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect

274
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -17,17 +17,23 @@
package validator
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ipfs/go-blockservice"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
nodeiter "github.com/vulcanize/go-eth-state-node-iterator"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres"
)
@ -38,10 +44,14 @@ type Validator struct {
trieDB *trie.Database
stateDatabase state.Database
db *pgipfsethdb.Database
iterWorkers uint
}
var emptyCodeHash = crypto.Keccak256(nil)
// NewPGIPFSValidator returns a new trie validator ontop of a connection pool for an IPFS backing Postgres database
func NewPGIPFSValidator(db *sqlx.DB) *Validator {
func NewPGIPFSValidator(db *sqlx.DB, workers uint) *Validator {
kvs := pgipfsethdb.NewKeyValueStore(db, pgipfsethdb.CacheConfig{
Name: "kv",
Size: 16 * 1000 * 1000, // 16MB
@ -54,11 +64,15 @@ func NewPGIPFSValidator(db *sqlx.DB) *Validator {
ExpiryDuration: time.Hour * 8, // 8 hours
})
if workers == 0 {
workers = 1
}
return &Validator{
kvs: kvs,
trieDB: trie.NewDatabase(kvs),
stateDatabase: state.NewDatabase(database),
db: database.(*pgipfsethdb.Database),
iterWorkers: workers,
}
}
@ -67,13 +81,17 @@ func (v *Validator) GetCacheStats() groupcache.Stats {
}
// NewIPFSValidator returns a new trie validator ontop of an IPFS blockservice
func NewIPFSValidator(bs blockservice.BlockService) *Validator {
func NewIPFSValidator(bs blockservice.BlockService, workers uint) *Validator {
kvs := ipfsethdb.NewKeyValueStore(bs)
database := ipfsethdb.NewDatabase(bs)
if workers == 0 {
workers = 1
}
return &Validator{
kvs: kvs,
trieDB: trie.NewDatabase(kvs),
stateDatabase: state.NewDatabase(database),
iterWorkers: workers,
}
}
@ -88,21 +106,82 @@ func NewValidator(kvs ethdb.KeyValueStore, database ethdb.Database) *Validator {
}
}
// Traverses each iterator in a separate goroutine.
// If storage = true, also traverse storage tries for each leaf.
func (v *Validator) iterateAsync(iters []trie.NodeIterator, storage bool) error {
var wg sync.WaitGroup
errors := make(chan error)
for _, it := range iters {
wg.Add(1)
go func(it trie.NodeIterator) {
defer wg.Done()
for it.Next(true) {
// Iterate through entire state trie. it.Next() will return false when we have
// either completed iteration of the entire trie or run into an error (e.g. a
// missing node). If we are able to iterate through the entire trie without error
// then the trie is complete.
// If storage is not requested, or the state trie node is an internal entry, leave as is
if !storage || !it.Leaf() {
continue
}
// Otherwise we've reached an account node, initiate data iteration
var account types.StateAccount
if err := rlp.Decode(bytes.NewReader(it.LeafBlob()), &account); err != nil {
errors <- err
break
}
dataTrie, err := v.stateDatabase.OpenStorageTrie(common.BytesToHash(it.LeafKey()), account.Root)
if err != nil {
errors <- err
break
}
dataIt := dataTrie.NodeIterator(nil)
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
addrHash := common.BytesToHash(it.LeafKey())
_, err := v.stateDatabase.ContractCode(addrHash, common.BytesToHash(account.CodeHash))
if err != nil {
errors <- fmt.Errorf("code %x: %v", account.CodeHash, err)
break
}
}
for dataIt.Next(true) {
}
if err = dataIt.Error(); err != nil {
errors <- err
break
}
}
if it.Error() != nil {
errors <- it.Error()
}
}(it)
}
done := make(chan struct{})
go func() {
wg.Wait()
done <- struct{}{}
}()
var err error
select {
case err = <-errors:
case <-done:
close(errors)
}
return err
}
// ValidateTrie returns an error if the state and storage tries for the provided state root cannot be confirmed as complete
// This does consider child storage tries
func (v *Validator) ValidateTrie(stateRoot common.Hash) error {
// Generate the state.NodeIterator for this root
stateDB, err := state.New(stateRoot, v.stateDatabase, nil)
t, err := v.stateDatabase.OpenTrie(stateRoot)
if err != nil {
return err
}
it := state.NewNodeIterator(stateDB)
for it.Next() {
// iterate through entire state trie and descendent storage tries
// it.Next() will return false when we have either completed iteration of the entire trie or have ran into an error (e.g. a missing node)
// if we are able to iterate through the entire trie without error then the trie is complete
}
return it.Error
iters := nodeiter.SubtrieIterators(t, v.iterWorkers)
return v.iterateAsync(iters, true)
}
// ValidateStateTrie returns an error if the state trie for the provided state root cannot be confirmed as complete
@ -113,13 +192,8 @@ func (v *Validator) ValidateStateTrie(stateRoot common.Hash) error {
if err != nil {
return err
}
it := t.NodeIterator(nil)
for it.Next(true) {
// iterate through entire state trie
// it.Next() will return false when we have either completed iteration of the entire trie or have ran into an error (e.g. a missing node)
// if we are able to iterate through the entire trie without error then the trie is complete
}
return it.Error()
iters := nodeiter.SubtrieIterators(t, v.iterWorkers)
return v.iterateAsync(iters, false)
}
// ValidateStorageTrie returns an error if the storage trie for the provided storage root and contract address cannot be confirmed as complete
@ -130,13 +204,8 @@ func (v *Validator) ValidateStorageTrie(address common.Address, storageRoot comm
if err != nil {
return err
}
it := t.NodeIterator(nil)
for it.Next(true) {
// iterate through entire storage trie
// it.Next() will return false when we have either completed iteration of the entire trie or have ran into an error (e.g. a missing node)
// if we are able to iterate through the entire trie without error then the trie is complete
}
return it.Error()
iters := nodeiter.SubtrieIterators(t, v.iterWorkers)
return v.iterateAsync(iters, false)
}
// Close implements io.Closer

View File

@ -200,7 +200,7 @@ var _ = Describe("PG-IPFS Validator", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
v = validator.NewPGIPFSValidator(db)
v = validator.NewPGIPFSValidator(db, 4)
})
AfterEach(func() {
v.Close()