Track and recover iteration state

This commit is contained in:
Roy Crihfield 2022-08-24 14:09:46 -05:00 committed by Roy
parent d1b423830c
commit defc4c8c06
6 changed files with 151 additions and 97 deletions

View File

@ -59,15 +59,19 @@ It can operate at three levels:
} }
func validateTrie() { func validateTrie() {
numWorkers := viper.GetUint("validator.workers") params := validator.Params{
v, err := newValidator(numWorkers) Workers: viper.GetUint("validator.workers"),
RecoveryFormat: viper.GetString("validator.recoveryFormat"),
}
v, err := newValidator(params)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
stateRootStr := viper.GetString("validator.stateRoot") stateRootStr := viper.GetString("validator.stateRoot")
storageRootStr := viper.GetString("validator.storageRoot") storageRootStr := viper.GetString("validator.storageRoot")
contractAddrStr := viper.GetString("validator.address") contractAddrStr := viper.GetString("validator.address")
switch strings.ToLower(viper.GetString("validator.type")) { traversal := strings.ToLower(viper.GetString("validator.type"))
switch traversal {
case "f", "full": case "f", "full":
if stateRootStr == "" { if stateRootStr == "" {
logWithCommand.Fatal("must provide a state root for full state validation") logWithCommand.Fatal("must provide a state root for full state validation")
@ -99,26 +103,28 @@ func validateTrie() {
logWithCommand.Fatalf("Storage trie for contract %s and root %s not complete\r\nerr: %v", addr.String(), storageRoot.String(), err) logWithCommand.Fatalf("Storage trie for contract %s and root %s not complete\r\nerr: %v", addr.String(), storageRoot.String(), err)
} }
logWithCommand.Infof("Storage trie for contract %s and root %s is complete", addr.String(), storageRoot.String()) logWithCommand.Infof("Storage trie for contract %s and root %s is complete", addr.String(), storageRoot.String())
default:
logWithCommand.Fatalf("Invalid traversal level: '%s'", traversal)
} }
stats := v.GetCacheStats() stats := v.GetCacheStats()
logWithCommand.Debugf("groupcache stats %+v", stats) logWithCommand.Debugf("groupcache stats %+v", stats)
} }
func newValidator(workers uint) (*validator.Validator, error) { func newValidator(params validator.Params) (*validator.Validator, error) {
ipfsPath := viper.GetString("ipfs.path") ipfsPath := viper.GetString("ipfs.path")
if ipfsPath == "" { if ipfsPath == "" {
db, err := validator.NewDB() db, err := validator.NewDB()
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
return validator.NewPGIPFSValidator(db, workers), nil return validator.NewPGIPFSValidator(db, params), nil
} }
bs, err := validator.InitIPFSBlockService(ipfsPath) bs, err := validator.InitIPFSBlockService(ipfsPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return validator.NewIPFSValidator(bs, workers), nil return validator.NewIPFSValidator(bs, params), nil
} }
func init() { func init() {
@ -130,11 +136,13 @@ func init() {
validateTrieCmd.PersistentFlags().String("address", "", "Contract address for the storage trie we wish to validate; for storage validation") validateTrieCmd.PersistentFlags().String("address", "", "Contract address for the storage trie we wish to validate; for storage validation")
validateTrieCmd.PersistentFlags().String("ipfs-path", "", "Path to IPFS repository; if provided operations move through the IPFS repo otherwise Postgres connection params are expected in the provided config") validateTrieCmd.PersistentFlags().String("ipfs-path", "", "Path to IPFS repository; if provided operations move through the IPFS repo otherwise Postgres connection params are expected in the provided config")
validateTrieCmd.PersistentFlags().Int("workers", 4, "number of concurrent workers to use") validateTrieCmd.PersistentFlags().Int("workers", 4, "number of concurrent workers to use")
validateTrieCmd.PersistentFlags().String("recovery-format", validator.DefaultRecoveryFormat, "format pattern for recovery files")
viper.BindPFlag("validator.stateRoot", validateTrieCmd.PersistentFlags().Lookup("state-root")) viper.BindPFlag("validator.stateRoot", validateTrieCmd.PersistentFlags().Lookup("state-root"))
viper.BindPFlag("validator.type", validateTrieCmd.PersistentFlags().Lookup("type")) viper.BindPFlag("validator.type", validateTrieCmd.PersistentFlags().Lookup("type"))
viper.BindPFlag("validator.storageRoot", validateTrieCmd.PersistentFlags().Lookup("storage-root")) viper.BindPFlag("validator.storageRoot", validateTrieCmd.PersistentFlags().Lookup("storage-root"))
viper.BindPFlag("validator.address", validateTrieCmd.PersistentFlags().Lookup("address")) viper.BindPFlag("validator.address", validateTrieCmd.PersistentFlags().Lookup("address"))
viper.BindPFlag("validator.workers", validateTrieCmd.PersistentFlags().Lookup("workers")) viper.BindPFlag("validator.workers", validateTrieCmd.PersistentFlags().Lookup("workers"))
viper.BindPFlag("validator.recoveryFormat", validateTrieCmd.PersistentFlags().Lookup("recovery-format"))
viper.BindPFlag("ipfs.path", validateTrieCmd.PersistentFlags().Lookup("ipfs-path")) viper.BindPFlag("ipfs.path", validateTrieCmd.PersistentFlags().Lookup("ipfs-path"))
} }

10
go.mod
View File

@ -15,10 +15,12 @@ require (
github.com/multiformats/go-multihash v0.2.0 github.com/multiformats/go-multihash v0.2.0
github.com/onsi/ginkgo v1.16.5 github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0 github.com/onsi/gomega v1.19.0
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.4.0 github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.11.0 github.com/spf13/viper v1.11.0
github.com/vulcanize/ipfs-ethdb/v4 v4.0.7-alpha github.com/vulcanize/go-eth-state-node-iterator v1.1.6
github.com/vulcanize/ipfs-ethdb/v4 v4.0.5-alpha
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
) )
require ( require (
@ -202,7 +204,6 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect github.com/tklauser/numcpus v0.2.2 // indirect
github.com/vulcanize/go-eth-state-node-iterator v1.1.4
github.com/wI2L/jsondiff v0.2.0 // indirect github.com/wI2L/jsondiff v0.2.0 // indirect
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc // indirect github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 // indirect
@ -223,8 +224,7 @@ require (
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect

8
go.sum
View File

@ -1397,6 +1397,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/roysc/go-eth-state-node-iterator v0.0.1-alpha.0.20220825012218-c991d5a96e0f h1:DQCNcq5KRb6c0t/tnOZRwk9H6NdkNJFLZ6BXhs506Bo=
github.com/roysc/go-eth-state-node-iterator v0.0.1-alpha.0.20220825012218-c991d5a96e0f/go.mod h1:MXuwoa7yPL6wUgPa6Bh1qQveiYaNcLiqRE6e03dDPeo=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM= github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
@ -1438,8 +1440,9 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0 h1:UVQPSSmc3qtTi+zPPkCXvZX9VvW/xT/NsRvKfwY81a8= github.com/smartystreets/assertions v1.0.0 h1:UVQPSSmc3qtTi+zPPkCXvZX9VvW/xT/NsRvKfwY81a8=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
@ -1910,8 +1913,9 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220517195934-5e4e11fc645e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220517195934-5e4e11fc645e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=

9
pkg/types.go Normal file
View File

@ -0,0 +1,9 @@
package validator
type TraversalType = string
const (
fullTraversal = "full"
stateTraversal = "state"
storageTraversal = "storage"
)

View File

@ -18,8 +18,8 @@ package validator
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -32,8 +32,11 @@ import (
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2" "github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
nodeiter "github.com/vulcanize/go-eth-state-node-iterator" nodeiter "github.com/vulcanize/go-eth-state-node-iterator"
"github.com/vulcanize/go-eth-state-node-iterator/tracker"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4" ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres" pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres"
) )
@ -45,13 +48,21 @@ type Validator struct {
stateDatabase state.Database stateDatabase state.Database
db *pgipfsethdb.Database db *pgipfsethdb.Database
iterWorkers uint params Params
} }
var emptyCodeHash = crypto.Keccak256(nil) type Params struct {
Workers uint
RecoveryFormat string // %s substituted with traversal type
}
var (
DefaultRecoveryFormat = "./recover_validate_%s"
emptyCodeHash = crypto.Keccak256(nil)
)
// NewPGIPFSValidator returns a new trie validator ontop of a connection pool for an IPFS backing Postgres database // NewPGIPFSValidator returns a new trie validator ontop of a connection pool for an IPFS backing Postgres database
func NewPGIPFSValidator(db *sqlx.DB, workers uint) *Validator { func NewPGIPFSValidator(db *sqlx.DB, par Params) *Validator {
kvs := pgipfsethdb.NewKeyValueStore(db, pgipfsethdb.CacheConfig{ kvs := pgipfsethdb.NewKeyValueStore(db, pgipfsethdb.CacheConfig{
Name: "kv", Name: "kv",
Size: 16 * 1000 * 1000, // 16MB Size: 16 * 1000 * 1000, // 16MB
@ -64,15 +75,13 @@ func NewPGIPFSValidator(db *sqlx.DB, workers uint) *Validator {
ExpiryDuration: time.Hour * 8, // 8 hours ExpiryDuration: time.Hour * 8, // 8 hours
}) })
if workers == 0 { normalizeParams(&par)
workers = 1
}
return &Validator{ return &Validator{
kvs: kvs, kvs: kvs,
trieDB: trie.NewDatabase(kvs), trieDB: trie.NewDatabase(kvs),
stateDatabase: state.NewDatabase(database), stateDatabase: state.NewDatabase(database),
db: database.(*pgipfsethdb.Database), db: database.(*pgipfsethdb.Database),
iterWorkers: workers, params: par,
} }
} }
@ -81,17 +90,15 @@ func (v *Validator) GetCacheStats() groupcache.Stats {
} }
// NewIPFSValidator returns a new trie validator ontop of an IPFS blockservice // NewIPFSValidator returns a new trie validator ontop of an IPFS blockservice
func NewIPFSValidator(bs blockservice.BlockService, workers uint) *Validator { func NewIPFSValidator(bs blockservice.BlockService, par Params) *Validator {
kvs := ipfsethdb.NewKeyValueStore(bs) kvs := ipfsethdb.NewKeyValueStore(bs)
database := ipfsethdb.NewDatabase(bs) database := ipfsethdb.NewDatabase(bs)
if workers == 0 { normalizeParams(&par)
workers = 1
}
return &Validator{ return &Validator{
kvs: kvs, kvs: kvs,
trieDB: trie.NewDatabase(kvs), trieDB: trie.NewDatabase(kvs),
stateDatabase: state.NewDatabase(database), stateDatabase: state.NewDatabase(database),
iterWorkers: workers, params: par,
} }
} }
@ -106,71 +113,14 @@ func NewValidator(kvs ethdb.KeyValueStore, database ethdb.Database) *Validator {
} }
} }
// Traverses each iterator in a separate goroutine. // Ensure params are valid
// If storage = true, also traverse storage tries for each leaf. func normalizeParams(p *Params) {
func (v *Validator) iterateAsync(iters []trie.NodeIterator, storage bool) error { if p.Workers == 0 {
var wg sync.WaitGroup p.Workers = 1
errors := make(chan error)
for _, it := range iters {
wg.Add(1)
go func(it trie.NodeIterator) {
defer wg.Done()
for it.Next(true) {
// Iterate through entire state trie. it.Next() will return false when we have
// either completed iteration of the entire trie or run into an error (e.g. a
// missing node). If we are able to iterate through the entire trie without error
// then the trie is complete.
// If storage is not requested, or the state trie node is an internal entry, leave as is
if !storage || !it.Leaf() {
continue
}
// Otherwise we've reached an account node, initiate data iteration
var account types.StateAccount
if err := rlp.Decode(bytes.NewReader(it.LeafBlob()), &account); err != nil {
errors <- err
break
}
dataTrie, err := v.stateDatabase.OpenStorageTrie(common.BytesToHash(it.LeafKey()), account.Root)
if err != nil {
errors <- err
break
}
dataIt := dataTrie.NodeIterator(nil)
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
addrHash := common.BytesToHash(it.LeafKey())
_, err := v.stateDatabase.ContractCode(addrHash, common.BytesToHash(account.CodeHash))
if err != nil {
errors <- fmt.Errorf("code %x: %v", account.CodeHash, err)
break
}
}
for dataIt.Next(true) {
}
if err = dataIt.Error(); err != nil {
errors <- err
break
}
}
if it.Error() != nil {
errors <- it.Error()
}
}(it)
} }
if len(p.RecoveryFormat) == 0 {
done := make(chan struct{}) p.RecoveryFormat = DefaultRecoveryFormat
go func() {
wg.Wait()
done <- struct{}{}
}()
var err error
select {
case err = <-errors:
case <-done:
close(errors)
} }
return err
} }
// ValidateTrie returns an error if the state and storage tries for the provided state root cannot be confirmed as complete // ValidateTrie returns an error if the state and storage tries for the provided state root cannot be confirmed as complete
@ -180,8 +130,8 @@ func (v *Validator) ValidateTrie(stateRoot common.Hash) error {
if err != nil { if err != nil {
return err return err
} }
iters := nodeiter.SubtrieIterators(t, v.iterWorkers) iterate := func(it trie.NodeIterator) error { return v.iterate(it, true) }
return v.iterateAsync(iters, true) return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, fullTraversal), v.params.Workers, iterate)
} }
// ValidateStateTrie returns an error if the state trie for the provided state root cannot be confirmed as complete // ValidateStateTrie returns an error if the state trie for the provided state root cannot be confirmed as complete
@ -192,8 +142,8 @@ func (v *Validator) ValidateStateTrie(stateRoot common.Hash) error {
if err != nil { if err != nil {
return err return err
} }
iters := nodeiter.SubtrieIterators(t, v.iterWorkers) iterate := func(it trie.NodeIterator) error { return v.iterate(it, false) }
return v.iterateAsync(iters, false) return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, stateTraversal), v.params.Workers, iterate)
} }
// ValidateStorageTrie returns an error if the storage trie for the provided storage root and contract address cannot be confirmed as complete // ValidateStorageTrie returns an error if the storage trie for the provided storage root and contract address cannot be confirmed as complete
@ -204,8 +154,8 @@ func (v *Validator) ValidateStorageTrie(address common.Address, storageRoot comm
if err != nil { if err != nil {
return err return err
} }
iters := nodeiter.SubtrieIterators(t, v.iterWorkers) iterate := func(it trie.NodeIterator) error { return v.iterate(it, false) }
return v.iterateAsync(iters, false) return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, storageTraversal), v.params.Workers, iterate)
} }
// Close implements io.Closer // Close implements io.Closer
@ -215,3 +165,79 @@ func (v *Validator) Close() error {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
return nil return nil
} }
// Traverses each iterator in a separate goroutine.
// If storage = true, also traverse storage tries for each leaf.
func (v *Validator) iterate(it trie.NodeIterator, storage bool) error {
// Iterate through entire state trie. it.Next() will return false when we have
// either completed iteration of the entire trie or run into an error (e.g. a
// missing node). If we are able to iterate through the entire trie without error
// then the trie is complete.
for it.Next(true) {
// This block adapted from geth - core/state/iterator.go
// If storage is not requested, or the state trie node is an internal entry, skip
if !storage || !it.Leaf() {
continue
}
// Otherwise we've reached an account node, initiate data iteration
var account types.StateAccount
if err := rlp.Decode(bytes.NewReader(it.LeafBlob()), &account); err != nil {
return err
}
dataTrie, err := v.stateDatabase.OpenStorageTrie(common.BytesToHash(it.LeafKey()), account.Root)
if err != nil {
return err
}
dataIt := dataTrie.NodeIterator(nil)
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
addrHash := common.BytesToHash(it.LeafKey())
_, err := v.stateDatabase.ContractCode(addrHash, common.BytesToHash(account.CodeHash))
if err != nil {
return fmt.Errorf("code %x: %v", account.CodeHash, err)
}
}
for dataIt.Next(true) {
}
if dataIt.Error() != nil {
return dataIt.Error()
}
}
return it.Error()
}
func iterateTracked(tree state.Trie, recoveryFile string, iterCount uint, fn func(trie.NodeIterator) error) error {
ctx, cancelCtx := context.WithCancel(context.Background())
tracker := tracker.New(recoveryFile, iterCount)
tracker.CaptureSignal(cancelCtx)
halt := func() {
if err := tracker.HaltAndDump(); err != nil {
log.Errorf("failed to write recovery file: %v", err)
}
}
// attempt to restore from recovery file if it exists
iters, err := tracker.Restore(tree)
if err != nil {
return err
}
if iterCount < uint(len(iters)) {
return fmt.Errorf("recovered too many iterators: got %d, expected %d", len(iters), iterCount)
}
if iters == nil { // nothing restored
iters = nodeiter.SubtrieIterators(tree, iterCount)
for i, it := range iters {
iters[i] = tracker.Tracked(it, nil)
}
}
g, ctx := errgroup.WithContext(ctx)
defer halt()
for _, it := range iters {
func(it trie.NodeIterator) {
g.Go(func() error { return fn(it) })
}(it)
}
return g.Wait()
}

View File

@ -19,6 +19,8 @@ package validator_test
import ( import (
"fmt" "fmt"
"math/big" "math/big"
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -194,15 +196,20 @@ var (
v *validator.Validator v *validator.Validator
db *sqlx.DB db *sqlx.DB
err error err error
tmp string
) )
var _ = Describe("PG-IPFS Validator", func() { var _ = Describe("PG-IPFS Validator", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = pgipfsethdb.TestDB() db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
v = validator.NewPGIPFSValidator(db, 4) tmp, err = os.MkdirTemp("", "test_validator")
Expect(err).ToNot(HaveOccurred())
params := validator.Params{Workers: 4, RecoveryFormat: filepath.Join(tmp, "recover_%s")}
v = validator.NewPGIPFSValidator(db, params)
}) })
AfterEach(func() { AfterEach(func() {
os.RemoveAll(tmp)
v.Close() v.Close()
}) })
Describe("ValidateTrie", func() { Describe("ValidateTrie", func() {
@ -210,7 +217,7 @@ var _ = Describe("PG-IPFS Validator", func() {
err = validator.ResetTestDB(db) err = validator.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("Returns an error the state root node is missing", func() { It("Returns an error if the state root node is missing", func() {
// we write code to ethdb, there should probably be an EthCode IPLD codec // we write code to ethdb, there should probably be an EthCode IPLD codec
// but there isn't, and we don't need one here since blockstore keys are mh-derived // but there isn't, and we don't need one here since blockstore keys are mh-derived
loadTrie(append(missingRootStateNodes, mockCode), trieStorageNodes) loadTrie(append(missingRootStateNodes, mockCode), trieStorageNodes)