From defc4c8c06471e14f7f345f33aaf805a08e78dc4 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 24 Aug 2022 14:09:46 -0500 Subject: [PATCH] Track and recover iteration state --- cmd/validateTrie.go | 20 +++-- go.mod | 10 +-- go.sum | 8 +- pkg/types.go | 9 ++ pkg/validator.go | 190 ++++++++++++++++++++++++------------------ pkg/validator_test.go | 11 ++- 6 files changed, 151 insertions(+), 97 deletions(-) create mode 100644 pkg/types.go diff --git a/cmd/validateTrie.go b/cmd/validateTrie.go index 7b7f5ad..6675881 100644 --- a/cmd/validateTrie.go +++ b/cmd/validateTrie.go @@ -59,15 +59,19 @@ It can operate at three levels: } func validateTrie() { - numWorkers := viper.GetUint("validator.workers") - v, err := newValidator(numWorkers) + params := validator.Params{ + Workers: viper.GetUint("validator.workers"), + RecoveryFormat: viper.GetString("validator.recoveryFormat"), + } + v, err := newValidator(params) if err != nil { logWithCommand.Fatal(err) } stateRootStr := viper.GetString("validator.stateRoot") storageRootStr := viper.GetString("validator.storageRoot") contractAddrStr := viper.GetString("validator.address") - switch strings.ToLower(viper.GetString("validator.type")) { + traversal := strings.ToLower(viper.GetString("validator.type")) + switch traversal { case "f", "full": if stateRootStr == "" { logWithCommand.Fatal("must provide a state root for full state validation") @@ -99,26 +103,28 @@ func validateTrie() { logWithCommand.Fatalf("Storage trie for contract %s and root %s not complete\r\nerr: %v", addr.String(), storageRoot.String(), err) } logWithCommand.Infof("Storage trie for contract %s and root %s is complete", addr.String(), storageRoot.String()) + default: + logWithCommand.Fatalf("Invalid traversal level: '%s'", traversal) } stats := v.GetCacheStats() logWithCommand.Debugf("groupcache stats %+v", stats) } -func newValidator(workers uint) (*validator.Validator, error) { +func newValidator(params validator.Params) (*validator.Validator, error) { ipfsPath := viper.GetString("ipfs.path") if ipfsPath == "" { db, err := validator.NewDB() if err != nil { logWithCommand.Fatal(err) } - return validator.NewPGIPFSValidator(db, workers), nil + return validator.NewPGIPFSValidator(db, params), nil } bs, err := validator.InitIPFSBlockService(ipfsPath) if err != nil { return nil, err } - return validator.NewIPFSValidator(bs, workers), nil + return validator.NewIPFSValidator(bs, params), nil } func init() { @@ -130,11 +136,13 @@ func init() { validateTrieCmd.PersistentFlags().String("address", "", "Contract address for the storage trie we wish to validate; for storage validation") validateTrieCmd.PersistentFlags().String("ipfs-path", "", "Path to IPFS repository; if provided operations move through the IPFS repo otherwise Postgres connection params are expected in the provided config") validateTrieCmd.PersistentFlags().Int("workers", 4, "number of concurrent workers to use") + validateTrieCmd.PersistentFlags().String("recovery-format", validator.DefaultRecoveryFormat, "format pattern for recovery files") viper.BindPFlag("validator.stateRoot", validateTrieCmd.PersistentFlags().Lookup("state-root")) viper.BindPFlag("validator.type", validateTrieCmd.PersistentFlags().Lookup("type")) viper.BindPFlag("validator.storageRoot", validateTrieCmd.PersistentFlags().Lookup("storage-root")) viper.BindPFlag("validator.address", validateTrieCmd.PersistentFlags().Lookup("address")) viper.BindPFlag("validator.workers", validateTrieCmd.PersistentFlags().Lookup("workers")) + viper.BindPFlag("validator.recoveryFormat", validateTrieCmd.PersistentFlags().Lookup("recovery-format")) viper.BindPFlag("ipfs.path", validateTrieCmd.PersistentFlags().Lookup("ipfs-path")) } diff --git a/go.mod b/go.mod index 677627c..6f53ea7 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,12 @@ require ( github.com/multiformats/go-multihash v0.2.0 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.19.0 - github.com/sirupsen/logrus v1.8.1 + github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.11.0 - github.com/vulcanize/ipfs-ethdb/v4 v4.0.7-alpha + github.com/vulcanize/go-eth-state-node-iterator v1.1.6 + github.com/vulcanize/ipfs-ethdb/v4 v4.0.5-alpha + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) require ( @@ -202,7 +204,6 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect - github.com/vulcanize/go-eth-state-node-iterator v1.1.4 github.com/wI2L/jsondiff v0.2.0 // indirect github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 // indirect @@ -223,8 +224,7 @@ require ( golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect diff --git a/go.sum b/go.sum index 2c8770d..a664865 100644 --- a/go.sum +++ b/go.sum @@ -1397,6 +1397,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/roysc/go-eth-state-node-iterator v0.0.1-alpha.0.20220825012218-c991d5a96e0f h1:DQCNcq5KRb6c0t/tnOZRwk9H6NdkNJFLZ6BXhs506Bo= +github.com/roysc/go-eth-state-node-iterator v0.0.1-alpha.0.20220825012218-c991d5a96e0f/go.mod h1:MXuwoa7yPL6wUgPa6Bh1qQveiYaNcLiqRE6e03dDPeo= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -1438,8 +1440,9 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.0.0 h1:UVQPSSmc3qtTi+zPPkCXvZX9VvW/xT/NsRvKfwY81a8= github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= @@ -1910,8 +1913,9 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220517195934-5e4e11fc645e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/types.go b/pkg/types.go new file mode 100644 index 0000000..3304c98 --- /dev/null +++ b/pkg/types.go @@ -0,0 +1,9 @@ +package validator + +type TraversalType = string + +const ( + fullTraversal = "full" + stateTraversal = "state" + storageTraversal = "storage" +) diff --git a/pkg/validator.go b/pkg/validator.go index 617b545..14e8cb3 100644 --- a/pkg/validator.go +++ b/pkg/validator.go @@ -18,8 +18,8 @@ package validator import ( "bytes" + "context" "fmt" - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -32,8 +32,11 @@ import ( "github.com/ipfs/go-blockservice" "github.com/jmoiron/sqlx" "github.com/mailgun/groupcache/v2" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" nodeiter "github.com/vulcanize/go-eth-state-node-iterator" + "github.com/vulcanize/go-eth-state-node-iterator/tracker" ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4" pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres" ) @@ -45,13 +48,21 @@ type Validator struct { stateDatabase state.Database db *pgipfsethdb.Database - iterWorkers uint + params Params } -var emptyCodeHash = crypto.Keccak256(nil) +type Params struct { + Workers uint + RecoveryFormat string // %s substituted with traversal type +} + +var ( + DefaultRecoveryFormat = "./recover_validate_%s" + emptyCodeHash = crypto.Keccak256(nil) +) // NewPGIPFSValidator returns a new trie validator ontop of a connection pool for an IPFS backing Postgres database -func NewPGIPFSValidator(db *sqlx.DB, workers uint) *Validator { +func NewPGIPFSValidator(db *sqlx.DB, par Params) *Validator { kvs := pgipfsethdb.NewKeyValueStore(db, pgipfsethdb.CacheConfig{ Name: "kv", Size: 16 * 1000 * 1000, // 16MB @@ -64,15 +75,13 @@ func NewPGIPFSValidator(db *sqlx.DB, workers uint) *Validator { ExpiryDuration: time.Hour * 8, // 8 hours }) - if workers == 0 { - workers = 1 - } + normalizeParams(&par) return &Validator{ kvs: kvs, trieDB: trie.NewDatabase(kvs), stateDatabase: state.NewDatabase(database), db: database.(*pgipfsethdb.Database), - iterWorkers: workers, + params: par, } } @@ -81,17 +90,15 @@ func (v *Validator) GetCacheStats() groupcache.Stats { } // NewIPFSValidator returns a new trie validator ontop of an IPFS blockservice -func NewIPFSValidator(bs blockservice.BlockService, workers uint) *Validator { +func NewIPFSValidator(bs blockservice.BlockService, par Params) *Validator { kvs := ipfsethdb.NewKeyValueStore(bs) database := ipfsethdb.NewDatabase(bs) - if workers == 0 { - workers = 1 - } + normalizeParams(&par) return &Validator{ kvs: kvs, trieDB: trie.NewDatabase(kvs), stateDatabase: state.NewDatabase(database), - iterWorkers: workers, + params: par, } } @@ -106,71 +113,14 @@ func NewValidator(kvs ethdb.KeyValueStore, database ethdb.Database) *Validator { } } -// Traverses each iterator in a separate goroutine. -// If storage = true, also traverse storage tries for each leaf. -func (v *Validator) iterateAsync(iters []trie.NodeIterator, storage bool) error { - var wg sync.WaitGroup - errors := make(chan error) - for _, it := range iters { - wg.Add(1) - go func(it trie.NodeIterator) { - defer wg.Done() - for it.Next(true) { - // Iterate through entire state trie. it.Next() will return false when we have - // either completed iteration of the entire trie or run into an error (e.g. a - // missing node). If we are able to iterate through the entire trie without error - // then the trie is complete. - - // If storage is not requested, or the state trie node is an internal entry, leave as is - if !storage || !it.Leaf() { - continue - } - // Otherwise we've reached an account node, initiate data iteration - var account types.StateAccount - if err := rlp.Decode(bytes.NewReader(it.LeafBlob()), &account); err != nil { - errors <- err - break - } - dataTrie, err := v.stateDatabase.OpenStorageTrie(common.BytesToHash(it.LeafKey()), account.Root) - if err != nil { - errors <- err - break - } - dataIt := dataTrie.NodeIterator(nil) - if !bytes.Equal(account.CodeHash, emptyCodeHash) { - addrHash := common.BytesToHash(it.LeafKey()) - _, err := v.stateDatabase.ContractCode(addrHash, common.BytesToHash(account.CodeHash)) - if err != nil { - errors <- fmt.Errorf("code %x: %v", account.CodeHash, err) - break - } - } - for dataIt.Next(true) { - } - if err = dataIt.Error(); err != nil { - errors <- err - break - } - - } - if it.Error() != nil { - errors <- it.Error() - } - }(it) +// Ensure params are valid +func normalizeParams(p *Params) { + if p.Workers == 0 { + p.Workers = 1 } - - done := make(chan struct{}) - go func() { - wg.Wait() - done <- struct{}{} - }() - var err error - select { - case err = <-errors: - case <-done: - close(errors) + if len(p.RecoveryFormat) == 0 { + p.RecoveryFormat = DefaultRecoveryFormat } - return err } // ValidateTrie returns an error if the state and storage tries for the provided state root cannot be confirmed as complete @@ -180,8 +130,8 @@ func (v *Validator) ValidateTrie(stateRoot common.Hash) error { if err != nil { return err } - iters := nodeiter.SubtrieIterators(t, v.iterWorkers) - return v.iterateAsync(iters, true) + iterate := func(it trie.NodeIterator) error { return v.iterate(it, true) } + return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, fullTraversal), v.params.Workers, iterate) } // ValidateStateTrie returns an error if the state trie for the provided state root cannot be confirmed as complete @@ -192,8 +142,8 @@ func (v *Validator) ValidateStateTrie(stateRoot common.Hash) error { if err != nil { return err } - iters := nodeiter.SubtrieIterators(t, v.iterWorkers) - return v.iterateAsync(iters, false) + iterate := func(it trie.NodeIterator) error { return v.iterate(it, false) } + return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, stateTraversal), v.params.Workers, iterate) } // ValidateStorageTrie returns an error if the storage trie for the provided storage root and contract address cannot be confirmed as complete @@ -204,8 +154,8 @@ func (v *Validator) ValidateStorageTrie(address common.Address, storageRoot comm if err != nil { return err } - iters := nodeiter.SubtrieIterators(t, v.iterWorkers) - return v.iterateAsync(iters, false) + iterate := func(it trie.NodeIterator) error { return v.iterate(it, false) } + return iterateTracked(t, fmt.Sprintf(v.params.RecoveryFormat, storageTraversal), v.params.Workers, iterate) } // Close implements io.Closer @@ -215,3 +165,79 @@ func (v *Validator) Close() error { groupcache.DeregisterGroup("db") return nil } + +// Traverses each iterator in a separate goroutine. +// If storage = true, also traverse storage tries for each leaf. +func (v *Validator) iterate(it trie.NodeIterator, storage bool) error { + // Iterate through entire state trie. it.Next() will return false when we have + // either completed iteration of the entire trie or run into an error (e.g. a + // missing node). If we are able to iterate through the entire trie without error + // then the trie is complete. + for it.Next(true) { + // This block adapted from geth - core/state/iterator.go + // If storage is not requested, or the state trie node is an internal entry, skip + if !storage || !it.Leaf() { + continue + } + // Otherwise we've reached an account node, initiate data iteration + var account types.StateAccount + if err := rlp.Decode(bytes.NewReader(it.LeafBlob()), &account); err != nil { + return err + } + dataTrie, err := v.stateDatabase.OpenStorageTrie(common.BytesToHash(it.LeafKey()), account.Root) + if err != nil { + return err + } + dataIt := dataTrie.NodeIterator(nil) + if !bytes.Equal(account.CodeHash, emptyCodeHash) { + addrHash := common.BytesToHash(it.LeafKey()) + _, err := v.stateDatabase.ContractCode(addrHash, common.BytesToHash(account.CodeHash)) + if err != nil { + return fmt.Errorf("code %x: %v", account.CodeHash, err) + } + } + for dataIt.Next(true) { + } + if dataIt.Error() != nil { + return dataIt.Error() + } + } + return it.Error() +} + +func iterateTracked(tree state.Trie, recoveryFile string, iterCount uint, fn func(trie.NodeIterator) error) error { + ctx, cancelCtx := context.WithCancel(context.Background()) + tracker := tracker.New(recoveryFile, iterCount) + tracker.CaptureSignal(cancelCtx) + halt := func() { + if err := tracker.HaltAndDump(); err != nil { + log.Errorf("failed to write recovery file: %v", err) + } + } + + // attempt to restore from recovery file if it exists + iters, err := tracker.Restore(tree) + if err != nil { + return err + } + if iterCount < uint(len(iters)) { + return fmt.Errorf("recovered too many iterators: got %d, expected %d", len(iters), iterCount) + } + + if iters == nil { // nothing restored + iters = nodeiter.SubtrieIterators(tree, iterCount) + for i, it := range iters { + iters[i] = tracker.Tracked(it, nil) + } + } + + g, ctx := errgroup.WithContext(ctx) + defer halt() + + for _, it := range iters { + func(it trie.NodeIterator) { + g.Go(func() error { return fn(it) }) + }(it) + } + return g.Wait() +} diff --git a/pkg/validator_test.go b/pkg/validator_test.go index 755f6f3..7ffc368 100644 --- a/pkg/validator_test.go +++ b/pkg/validator_test.go @@ -19,6 +19,8 @@ package validator_test import ( "fmt" "math/big" + "os" + "path/filepath" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -194,15 +196,20 @@ var ( v *validator.Validator db *sqlx.DB err error + tmp string ) var _ = Describe("PG-IPFS Validator", func() { BeforeEach(func() { db, err = pgipfsethdb.TestDB() Expect(err).ToNot(HaveOccurred()) - v = validator.NewPGIPFSValidator(db, 4) + tmp, err = os.MkdirTemp("", "test_validator") + Expect(err).ToNot(HaveOccurred()) + params := validator.Params{Workers: 4, RecoveryFormat: filepath.Join(tmp, "recover_%s")} + v = validator.NewPGIPFSValidator(db, params) }) AfterEach(func() { + os.RemoveAll(tmp) v.Close() }) Describe("ValidateTrie", func() { @@ -210,7 +217,7 @@ var _ = Describe("PG-IPFS Validator", func() { err = validator.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) - It("Returns an error the state root node is missing", func() { + It("Returns an error if the state root node is missing", func() { // we write code to ethdb, there should probably be an EthCode IPLD codec // but there isn't, and we don't need one here since blockstore keys are mh-derived loadTrie(append(missingRootStateNodes, mockCode), trieStorageNodes)