diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index eefc609..bacfe9f 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -44,13 +44,18 @@ func stateSnapshot() { logWithCommand.Fatal(err) } + recoveryFile := viper.GetString("snapshot.recoveryFile") + if recoveryFile == "" { + recoveryFile = "./snapshot_recovery" + } + mode := viper.GetString("snapshot.mode") pub, err := snapshot.NewPublisher(snapshot.SnapshotMode(mode), config) if err != nil { logWithCommand.Fatal(err) } - snapshotService, err := snapshot.NewSnapshotService(edb, pub) + snapshotService, err := snapshot.NewSnapshotService(edb, pub, recoveryFile) if err != nil { logWithCommand.Fatal(err) } diff --git a/fixture/chaindata/.gitignore b/fixture/chaindata/.gitignore new file mode 100644 index 0000000..8836fe1 --- /dev/null +++ b/fixture/chaindata/.gitignore @@ -0,0 +1,6 @@ +*.log +CURRENT* +LOCK +LOG +MANIFEST-* +ancient/FLOCK diff --git a/fixture/node_paths.go b/fixture/node_paths.go new file mode 100644 index 0000000..34f2023 --- /dev/null +++ b/fixture/node_paths.go @@ -0,0 +1,359 @@ +package fixture + +var Block1_StateNodePaths = [][]byte{ + []byte{}, + []byte{0}, + []byte{0, 0}, + []byte{0, 2}, + []byte{0, 2, 1}, + []byte{0, 2, 8}, + []byte{0, 2, 12}, + []byte{0, 3}, + []byte{0, 4}, + []byte{0, 6}, + []byte{0, 6, 3}, + []byte{0, 6, 13}, + []byte{0, 7}, + []byte{0, 8}, + []byte{0, 8, 7}, + []byte{0, 8, 11}, + []byte{0, 9}, + []byte{0, 9, 9}, + []byte{0, 9, 10}, + []byte{0, 12}, + []byte{0, 13}, + []byte{0, 14}, + []byte{1}, + []byte{1, 2}, + []byte{1, 2, 5}, + []byte{1, 2, 7}, + []byte{1, 3}, + []byte{1, 3, 1}, + []byte{1, 3, 11}, + []byte{1, 4}, + []byte{1, 5}, + []byte{1, 5, 11}, + []byte{1, 5, 12}, + []byte{1, 5, 15}, + []byte{1, 6}, + []byte{1, 8}, + []byte{1, 10}, + []byte{1, 13}, + []byte{1, 14}, + []byte{1, 14, 2}, + []byte{1, 14, 11}, + []byte{1, 15}, + []byte{1, 15, 9}, + []byte{1, 15, 15}, + []byte{2}, + []byte{2, 0}, + []byte{2, 0, 9}, + []byte{2, 0, 14}, + []byte{2, 1}, + []byte{2, 1, 1}, + []byte{2, 1, 3}, + []byte{2, 1, 14}, + []byte{2, 5}, + []byte{2, 6}, + []byte{2, 9}, + []byte{2, 9, 1}, + []byte{2, 9, 7}, + []byte{2, 11}, + []byte{2, 11, 7}, + []byte{2, 11, 13}, + []byte{2, 13}, + []byte{2, 13, 1}, + []byte{2, 13, 15}, + []byte{2, 15}, + []byte{3}, + []byte{3, 0}, + []byte{3, 0, 0}, + []byte{3, 0, 1}, + []byte{3, 2}, + []byte{3, 2, 3}, + []byte{3, 2, 15}, + []byte{3, 3}, + []byte{3, 4}, + []byte{3, 4, 2}, + []byte{3, 4, 4}, + []byte{3, 4, 5}, + []byte{3, 6}, + []byte{3, 8}, + []byte{3, 9}, + []byte{3, 10}, + []byte{3, 10, 2}, + []byte{3, 10, 8}, + []byte{3, 10, 12}, + []byte{3, 11}, + []byte{3, 12}, + []byte{3, 13}, + []byte{3, 14}, + []byte{3, 14, 4}, + []byte{3, 14, 9}, + []byte{3, 14, 14}, + []byte{3, 14, 14, 10}, + []byte{3, 14, 14, 15}, + []byte{4}, + []byte{4, 0}, + []byte{4, 0, 6}, + []byte{4, 0, 15}, + []byte{4, 1}, + []byte{4, 2}, + []byte{4, 2, 1}, + []byte{4, 2, 11}, + []byte{4, 3}, + []byte{4, 5}, + []byte{4, 6}, + []byte{4, 7}, + []byte{4, 8}, + []byte{4, 11}, + []byte{4, 11, 6}, + []byte{4, 11, 9}, + []byte{4, 11, 12}, + []byte{4, 14}, + []byte{5}, + []byte{5, 0}, + []byte{5, 0, 3}, + []byte{5, 0, 9}, + []byte{5, 0, 15}, + []byte{5, 1}, + []byte{5, 1, 14}, + []byte{5, 1, 15}, + []byte{5, 2}, + []byte{5, 2, 8}, + []byte{5, 2, 10}, + []byte{5, 3}, + []byte{5, 4}, + []byte{5, 4, 6}, + []byte{5, 4, 12}, + []byte{5, 6}, + []byte{5, 8}, + []byte{5, 8, 3}, + []byte{5, 8, 11}, + []byte{5, 10}, + []byte{5, 11}, + []byte{5, 12}, + []byte{5, 13}, + []byte{5, 15}, + []byte{6}, + []byte{6, 0}, + []byte{6, 2}, + []byte{6, 2, 3}, + []byte{6, 2, 9}, + []byte{6, 4}, + []byte{6, 4, 0}, + []byte{6, 4, 0, 0}, + []byte{6, 4, 0, 5}, + []byte{6, 5}, + []byte{6, 5, 4}, + []byte{6, 5, 10}, + []byte{6, 5, 12}, + []byte{6, 5, 13}, + []byte{6, 6}, + []byte{6, 6, 0}, + []byte{6, 6, 8}, + []byte{6, 8}, + []byte{6, 8, 4}, + []byte{6, 8, 4, 2}, + []byte{6, 8, 4, 9}, + []byte{6, 8, 9}, + []byte{6, 10}, + []byte{6, 10, 1}, + []byte{6, 10, 14}, + []byte{6, 11}, + []byte{6, 11, 2}, + []byte{6, 11, 12}, + []byte{6, 11, 14}, + []byte{6, 13}, + []byte{6, 13, 2}, + []byte{6, 13, 12}, + []byte{7}, + []byte{7, 1}, + []byte{7, 5}, + []byte{7, 7}, + []byte{7, 8}, + []byte{7, 8, 2}, + []byte{7, 8, 5}, + []byte{7, 9}, + []byte{7, 13}, + []byte{7, 13, 1}, + []byte{7, 13, 1, 0}, + []byte{7, 13, 1, 13}, + []byte{7, 13, 7}, + []byte{7, 14}, + []byte{7, 14, 8}, + []byte{7, 14, 11}, + []byte{8}, + []byte{8, 0}, + []byte{8, 0, 3}, + []byte{8, 0, 11}, + []byte{8, 2}, + []byte{8, 4}, + []byte{8, 8}, + []byte{8, 9}, + []byte{8, 9, 3}, + []byte{8, 9, 13}, + []byte{8, 10}, + []byte{8, 12}, + []byte{8, 12, 3}, + []byte{8, 12, 15}, + []byte{8, 13}, + []byte{8, 15}, + []byte{8, 15, 8}, + []byte{8, 15, 13}, + []byte{9}, + []byte{9, 0}, + []byte{9, 5}, + []byte{9, 6}, + []byte{9, 6, 10}, + []byte{9, 6, 14}, + []byte{9, 7}, + []byte{9, 9}, + []byte{9, 14}, + []byte{9, 15}, + []byte{9, 15, 0}, + []byte{9, 15, 4}, + []byte{9, 15, 10}, + []byte{10}, + []byte{10, 0}, + []byte{10, 0, 9}, + []byte{10, 0, 10}, + []byte{10, 0, 15}, + []byte{10, 2}, + []byte{10, 3}, + []byte{10, 6}, + []byte{10, 8}, + []byte{10, 9}, + []byte{10, 10}, + []byte{10, 10, 5}, + []byte{10, 10, 8}, + []byte{10, 13}, + []byte{10, 13, 0}, + []byte{10, 13, 13}, + []byte{10, 14}, + []byte{10, 14, 4}, + []byte{10, 14, 11}, + []byte{10, 14, 11, 8}, + []byte{10, 14, 11, 14}, + []byte{10, 15}, + []byte{11}, + []byte{11, 0}, + []byte{11, 0, 2}, + []byte{11, 0, 15}, + []byte{11, 1}, + []byte{11, 2}, + []byte{11, 3}, + []byte{11, 4}, + []byte{11, 5}, + []byte{11, 7}, + []byte{11, 7, 12}, + []byte{11, 7, 15}, + []byte{11, 8}, + []byte{11, 8, 8}, + []byte{11, 8, 15}, + []byte{11, 9}, + []byte{11, 11}, + []byte{11, 12}, + []byte{11, 13}, + []byte{11, 14}, + []byte{11, 14, 0}, + []byte{11, 14, 0, 1}, + []byte{11, 14, 0, 3}, + []byte{11, 14, 8}, + []byte{11, 14, 13}, + []byte{12}, + []byte{12, 0}, + []byte{12, 0, 0}, + []byte{12, 0, 1}, + []byte{12, 0, 1, 3}, + []byte{12, 0, 1, 11}, + []byte{12, 0, 15}, + []byte{12, 2}, + []byte{12, 2, 9}, + []byte{12, 2, 12}, + []byte{12, 4}, + []byte{12, 5}, + []byte{12, 6}, + []byte{12, 6, 0}, + []byte{12, 6, 4}, + []byte{12, 6, 14}, + []byte{12, 7}, + []byte{12, 7, 0}, + []byte{12, 7, 12}, + []byte{12, 7, 13}, + []byte{12, 9}, + []byte{12, 11}, + []byte{12, 12}, + []byte{13}, + []byte{13, 2}, + []byte{13, 2, 0}, + []byte{13, 2, 2}, + []byte{13, 2, 4}, + []byte{13, 3}, + []byte{13, 3, 7}, + []byte{13, 3, 10}, + []byte{13, 5}, + []byte{13, 8}, + []byte{13, 8, 1}, + []byte{13, 8, 15}, + []byte{13, 9}, + []byte{13, 9, 0}, + []byte{13, 9, 14}, + []byte{13, 10}, + []byte{13, 12}, + []byte{13, 12, 8}, + []byte{13, 12, 11}, + []byte{13, 13}, + []byte{13, 13, 7}, + []byte{13, 13, 12}, + []byte{13, 14}, + []byte{14}, + []byte{14, 0}, + []byte{14, 1}, + []byte{14, 2}, + []byte{14, 2, 2}, + []byte{14, 2, 12}, + []byte{14, 3}, + []byte{14, 4}, + []byte{14, 5}, + []byte{14, 6}, + []byte{14, 6, 9}, + []byte{14, 6, 12}, + []byte{14, 7}, + []byte{14, 7, 4}, + []byte{14, 7, 12}, + []byte{14, 8}, + []byte{14, 8, 3}, + []byte{14, 8, 12}, + []byte{14, 8, 12, 0}, + []byte{14, 8, 12, 6}, + []byte{14, 10}, + []byte{14, 10, 6}, + []byte{14, 10, 12}, + []byte{14, 11}, + []byte{14, 11, 8}, + []byte{14, 11, 13}, + []byte{14, 12}, + []byte{14, 14}, + []byte{14, 14, 3}, + []byte{14, 14, 9}, + []byte{15}, + []byte{15, 0}, + []byte{15, 5}, + []byte{15, 6}, + []byte{15, 9}, + []byte{15, 9, 0}, + []byte{15, 9, 2}, + []byte{15, 9, 3}, + []byte{15, 11}, + []byte{15, 11, 1}, + []byte{15, 11, 6}, + []byte{15, 12}, + []byte{15, 12, 3}, + []byte{15, 12, 14}, + []byte{15, 12, 14, 7}, + []byte{15, 12, 14, 13}, + []byte{15, 13}, + []byte{15, 14}, + []byte{15, 15}, +} diff --git a/fixture/service.go b/fixture/service.go index 25a48a2..8448d84 100644 --- a/fixture/service.go +++ b/fixture/service.go @@ -10,7 +10,7 @@ import ( snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" ) -var Header1 = types.Header{ +var Block1_Header = types.Header{ ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"), UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"), Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"), @@ -29,7 +29,7 @@ var Header1 = types.Header{ BaseFee: nil, } -var StateNode1 = snapt.Node{ +var Block1_StateNode0 = snapt.Node{ NodeType: 0, Path: []byte{12, 0}, Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), diff --git a/go.mod b/go.mod index 5dd22c3..db93f35 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/smartystreets/assertions v1.0.0 // indirect github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 - github.com/vulcanize/go-eth-state-node-iterator v1.0.0 + github.com/vulcanize/go-eth-state-node-iterator v1.0.1 go.uber.org/atomic v1.9.0 // indirect go.uber.org/goleak v1.1.11 // indirect go.uber.org/multierr v1.7.0 // indirect diff --git a/go.sum b/go.sum index 84a034b..7f9bae0 100644 --- a/go.sum +++ b/go.sum @@ -805,6 +805,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/vulcanize/go-eth-state-node-iterator v1.0.0 h1:FQ4s0K5TnRD44p3vO4uQtjfE8C1Gr2EDMT2vVGS3Lu8= github.com/vulcanize/go-eth-state-node-iterator v1.0.0/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo= +github.com/vulcanize/go-eth-state-node-iterator v1.0.1 h1:lI8+moQ0Nv62NwmiCxgvWkC/Wj46KmDPX1dhBwyqRy8= +github.com/vulcanize/go-eth-state-node-iterator v1.0.1/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo= github.com/vulcanize/go-ethereum v1.10.15-statediff-3.0.1 h1:MX7WcTwxpxOoYVMifrs9vsHDmM6gKOFF+2KnP19LddI= github.com/vulcanize/go-ethereum v1.10.15-statediff-3.0.1/go.mod h1:XO9WLkNXfwoJN05BZj0//xgOWHJyUrUPdnudbQfKlUo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/pkg/snapshot/file/publisher.go b/pkg/snapshot/file/publisher.go index 1d70a81..b1e09c0 100644 --- a/pkg/snapshot/file/publisher.go +++ b/pkg/snapshot/file/publisher.go @@ -147,14 +147,13 @@ func (p *publisher) txDir(index uint32) string { } func (p *publisher) BeginTx() (snapt.Tx, error) { - index := atomic.LoadUint32(&p.txCounter) + index := atomic.AddUint32(&p.txCounter, 1) - 1 dir := p.txDir(index) writers, err := makeFileWriters(dir, perNodeTables) if err != nil { return nil, err } - atomic.AddUint32(&p.txCounter, 1) return fileTx{writers}, nil } @@ -281,13 +280,15 @@ func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, func (p *publisher) logNodeCounters() { t := time.NewTicker(logInterval) for range t.C { - p.printNodeCounters() + p.printNodeCounters("progress") } } -func (p *publisher) printNodeCounters() { - logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) - logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) - logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) - logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) +func (p *publisher) printNodeCounters(msg string) { + log.WithFields(log.Fields{ + "runtime": time.Now().Sub(p.startTime).String(), + "state nodes": atomic.LoadUint64(&p.stateNodeCounter), + "storage nodes": atomic.LoadUint64(&p.storageNodeCounter), + "code nodes": atomic.LoadUint64(&p.codeNodeCounter), + }).Info(msg) } diff --git a/pkg/snapshot/file/publisher_test.go b/pkg/snapshot/file/publisher_test.go index ac8c9fe..382ec0c 100644 --- a/pkg/snapshot/file/publisher_test.go +++ b/pkg/snapshot/file/publisher_test.go @@ -33,12 +33,12 @@ var ( func writeFiles(t *testing.T, dir string) *publisher { pub, err := NewPublisher(dir, nodeInfo) test.NoError(t, err) - test.NoError(t, pub.PublishHeader(&fixt.Header1)) + test.NoError(t, pub.PublishHeader(&fixt.Block1_Header)) tx, err := pub.BeginTx() test.NoError(t, err) - headerID := fixt.Header1.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.StateNode1, headerID, tx)) + headerID := fixt.Block1_Header.Hash().String() + test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx)) test.NoError(t, tx.Commit()) return pub @@ -122,11 +122,11 @@ func TestPgCopy(t *testing.T) { BlockHash string } var header res - err = conn.QueryRow(ctx, pgQueryHeader, fixt.Header1.Number.Uint64()).Scan( + err = conn.QueryRow(ctx, pgQueryHeader, fixt.Block1_Header.Number.Uint64()).Scan( &header.CID, &header.BlockHash) test.NoError(t, err) - headerNode, err := ipld.NewEthHeader(&fixt.Header1) + headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header) test.ExpectEqual(t, headerNode.Cid().String(), header.CID) - test.ExpectEqual(t, fixt.Header1.Hash().String(), header.BlockHash) + test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash) } diff --git a/pkg/snapshot/pg/publisher.go b/pkg/snapshot/pg/publisher.go index a32d626..4f014a3 100644 --- a/pkg/snapshot/pg/publisher.go +++ b/pkg/snapshot/pg/publisher.go @@ -27,7 +27,7 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" "github.com/multiformats/go-multihash" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" @@ -81,8 +81,7 @@ func (p *publisher) BeginTx() (snapt.Tx, error) { } go p.logNodeCounters() return pubTx{tx, func() { - logrus.Info("----- final counts -----") - p.printNodeCounters() + p.printNodeCounters("final stats") }}, nil } @@ -227,13 +226,15 @@ func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, func (p *publisher) logNodeCounters() { t := time.NewTicker(logInterval) for range t.C { - p.printNodeCounters() + p.printNodeCounters("progress") } } -func (p *publisher) printNodeCounters() { - logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) - logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) - logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) - logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) +func (p *publisher) printNodeCounters(msg string) { + log.WithFields(log.Fields{ + "runtime": time.Now().Sub(p.startTime).String(), + "state nodes": atomic.LoadUint64(&p.stateNodeCounter), + "storage nodes": atomic.LoadUint64(&p.storageNodeCounter), + "code nodes": atomic.LoadUint64(&p.codeNodeCounter), + }).Info(msg) } diff --git a/pkg/snapshot/pg/publisher_test.go b/pkg/snapshot/pg/publisher_test.go index c4860aa..a6e9842 100644 --- a/pkg/snapshot/pg/publisher_test.go +++ b/pkg/snapshot/pg/publisher_test.go @@ -31,12 +31,12 @@ func writeData(t *testing.T) *publisher { driver, err := postgres.NewPGXDriver(context.Background(), pgConfig, nodeInfo) test.NoError(t, err) pub := NewPublisher(postgres.NewPostgresDB(driver)) - test.NoError(t, pub.PublishHeader(&fixt.Header1)) + test.NoError(t, pub.PublishHeader(&fixt.Block1_Header)) tx, err := pub.BeginTx() test.NoError(t, err) - headerID := fixt.Header1.Hash().String() - test.NoError(t, pub.PublishStateNode(&fixt.StateNode1, headerID, tx)) + headerID := fixt.Block1_Header.Hash().String() + test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx)) test.NoError(t, tx.Commit()) return pub @@ -68,11 +68,11 @@ func TestBasic(t *testing.T) { BlockHash string } var header res - err = conn.QueryRow(ctx, pgQueryHeader, fixt.Header1.Number.Uint64()).Scan( + err = conn.QueryRow(ctx, pgQueryHeader, fixt.Block1_Header.Number.Uint64()).Scan( &header.CID, &header.BlockHash) test.NoError(t, err) - headerNode, err := ipld.NewEthHeader(&fixt.Header1) + headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header) test.ExpectEqual(t, headerNode.Cid().String(), header.CID) - test.ExpectEqual(t, fixt.Header1.Hash().String(), header.BlockHash) + test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash) } diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 4146032..8978dd0 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -29,7 +29,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" . "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" iter "github.com/vulcanize/go-eth-state-node-iterator" @@ -50,6 +50,8 @@ type Service struct { stateDB state.Database ipfsPublisher Publisher maxBatchSize uint + tracker iteratorTracker + recoveryFile string } func NewLevelDB(con *EthConfig) (ethdb.Database, error) { @@ -59,12 +61,13 @@ func NewLevelDB(con *EthConfig) (ethdb.Database, error) { } // NewSnapshotService creates Service. -func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) { +func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string) (*Service, error) { return &Service{ ethDB: edb, stateDB: state.NewDatabase(edb), ipfsPublisher: pub, maxBatchSize: defaultBatchSize, + recoveryFile: recoveryFile, }, nil } @@ -76,37 +79,71 @@ type SnapshotParams struct { func (s *Service) CreateSnapshot(params SnapshotParams) error { // extract header from lvldb and publish to PG-IPFS // hold onto the headerID so that we can link the state nodes to this header - logrus.Infof("Creating snapshot at height %d", params.Height) + log.Infof("Creating snapshot at height %d", params.Height) hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height) header := rawdb.ReadHeader(s.ethDB, hash, params.Height) if header == nil { return fmt.Errorf("unable to read canonical header at height %d", params.Height) } - logrus.Infof("head hash: %s head height: %d", hash.Hex(), params.Height) + log.Infof("head hash: %s head height: %d", hash.Hex(), params.Height) err := s.ipfsPublisher.PublishHeader(header) if err != nil { return err } - t, err := s.stateDB.OpenTrie(header.Root) + tree, err := s.stateDB.OpenTrie(header.Root) if err != nil { return err } headerID := header.Hash().String() + s.tracker = newTracker(s.recoveryFile, int(params.Workers)) + go s.tracker.run() + go s.tracker.captureSignal() - if params.Workers > 0 { - return s.createSnapshotAsync(t, headerID, params.Workers) + var iters []trie.NodeIterator + // attempt to restore from recovery file if it exists + iters, err = s.tracker.restore(tree) + if err != nil { + return err + } + if iters != nil { + if params.Workers < uint(len(iters)) { + return fmt.Errorf( + "number of recovered workers (%d) is greater than number configured (%d)", + len(iters), params.Workers, + ) + } + } else { // nothing to restore + if params.Workers > 1 { + iters = iter.SubtrieIterators(tree, params.Workers) + } else { + iters = []trie.NodeIterator{tree.NodeIterator(nil)} + } + for i, it := range iters { + iters[i] = s.tracker.tracked(it) + } + } + + defer func() { + err := s.tracker.haltAndDump() + if err != nil { + log.Error("failed to write recovery file: ", err) + } + }() + + if len(iters) > 0 { + return s.createSnapshotAsync(iters, headerID) } else { - return s.createSnapshot(t.NodeIterator(nil), headerID) + return s.createSnapshot(iters[0], headerID) } return nil } // Create snapshot up to head (ignores height param) func (s *Service) CreateLatestSnapshot(workers uint) error { - logrus.Info("Creating snapshot at head") + log.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) height := rawdb.ReadHeaderNumber(s.ethDB, hash) if height == nil { @@ -176,7 +213,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { switch res.node.NodeType { case Leaf: - // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any + // if the node is a leaf, decode the account and publish the associated storage trie + // nodes if there are any var account types.StateAccount if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { return fmt.Errorf( @@ -197,7 +235,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { codeHash := common.BytesToHash(account.CodeHash) codeBytes := rawdb.ReadCode(s.ethDB, codeHash) if len(codeBytes) == 0 { - logrus.Error("Code is missing", "account", common.BytesToHash(it.LeafKey())) + log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey())) return errors.New("missing code") } @@ -217,34 +255,37 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error { default: return errors.New("unexpected node type") } - return nil } return it.Error() } // Full-trie concurrent snapshot -func (s *Service) createSnapshotAsync(tree state.Trie, headerID string, workers uint) error { +func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string) error { errors := make(chan error) var wg sync.WaitGroup - for _, it := range iter.SubtrieIterators(tree, workers) { + for _, it := range iters { wg.Add(1) - go func() { + go func(it trie.NodeIterator) { defer wg.Done() if err := s.createSnapshot(it, headerID); err != nil { errors <- err } - }() + }(it) } + + done := make(chan struct{}) go func() { - defer close(errors) wg.Wait() + done <- struct{}{} }() + var err error select { - case err := <-errors: - return err + case err = <-errors: + case <-done: + close(errors) } - return nil + return err } func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) { diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index bc5b81b..3dacc5f 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -1,12 +1,16 @@ package snapshot import ( + "errors" + "os" + "path/filepath" "testing" "github.com/golang/mock/gomock" fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture" mock "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot" + snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" "github.com/vulcanize/eth-pg-ipfs-state-snapshot/test" ) @@ -24,49 +28,113 @@ func testConfig(leveldbpath, ancientdbpath string) *Config { } } -func TestCreateSnapshot(t *testing.T) { - config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath) - - edb, err := NewLevelDB(config.Eth) - if err != nil { - t.Fatal(err) - } - workers := 4 - +func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) { ctl := gomock.NewController(t) - tx := mock.NewMockTx(ctl) pub := mock.NewMockPublisher(ctl) - - pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Header1)) - pub.EXPECT().BeginTx(). - Return(tx, nil). - Times(workers) - pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()). - Return(tx, nil). - Times(workers) - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). - Times(workers) - // TODO: fixtures for storage node - // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) - // pub.EXPECT().CommitTx(gomock.Any()). - // Times(workers) - - tx.EXPECT().Commit(). - Times(workers) - - service, err := NewSnapshotService(edb, pub) - if err != nil { - t.Fatal(err) - } - - params := SnapshotParams{Height: 1, Workers: uint(workers)} - err = service.CreateSnapshot(params) - if err != nil { - t.Fatal(err) - } - - // err = service.CreateLatestSnapshot(0) - // if err != nil { - // t.Fatal(err) - // } + tx := mock.NewMockTx(ctl) + return pub, tx +} + +func TestCreateSnapshot(t *testing.T) { + runCase := func(t *testing.T, workers int) { + pub, tx := makeMocks(t) + pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) + pub.EXPECT().BeginTx().Return(tx, nil). + Times(workers) + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil). + AnyTimes() + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). + Times(len(fixt.Block1_StateNodePaths)) + + // TODO: fixtures for storage node + // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) + + tx.EXPECT().Commit(). + Times(workers) + + config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath) + edb, err := NewLevelDB(config.Eth) + if err != nil { + t.Fatal(err) + } + defer edb.Close() + + recovery := filepath.Join(t.TempDir(), "recover.csv") + service, err := NewSnapshotService(edb, pub, recovery) + if err != nil { + t.Fatal(err) + } + + params := SnapshotParams{Height: 1, Workers: uint(workers)} + err = service.CreateSnapshot(params) + if err != nil { + t.Fatal(err) + } + } + + testCases := []int{1, 4, 16, 32} + for _, tc := range testCases { + t.Run("case", func(t *testing.T) { runCase(t, tc) }) + } +} + +func failingPublishStateNode(_ *snapt.Node, _ string, _ snapt.Tx) error { + return errors.New("failingPublishStateNode") +} + +func TestRecovery(t *testing.T) { + runCase := func(t *testing.T, workers int) { + pub, tx := makeMocks(t) + pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes() + pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes() + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). + Times(workers). + DoAndReturn(failingPublishStateNode) + tx.EXPECT().Commit().AnyTimes() + + config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath) + edb, err := NewLevelDB(config.Eth) + if err != nil { + t.Fatal(err) + } + defer edb.Close() + + recovery := filepath.Join(t.TempDir(), "recover.csv") + service, err := NewSnapshotService(edb, pub, recovery) + if err != nil { + t.Fatal(err) + } + + params := SnapshotParams{Height: 1, Workers: uint(workers)} + err = service.CreateSnapshot(params) + if err == nil { + t.Fatal("expected an error") + } + + if _, err = os.Stat(recovery); err != nil { + t.Fatal("cannot stat recovery file:", err) + } + + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + err = service.CreateSnapshot(params) + if err != nil { + t.Fatal(err) + } + + _, err = os.Stat(recovery) + if err == nil { + t.Fatal("recovery file still present") + } else { + if !os.IsNotExist(err) { + t.Fatal(err) + } + } + } + + testCases := []int{1, 4, 32} + for _, tc := range testCases { + t.Run("case", func(t *testing.T) { runCase(t, tc) }) + } + } diff --git a/pkg/snapshot/tracker.go b/pkg/snapshot/tracker.go new file mode 100644 index 0000000..8f7d643 --- /dev/null +++ b/pkg/snapshot/tracker.go @@ -0,0 +1,173 @@ +package snapshot + +import ( + "encoding/csv" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/trie" + log "github.com/sirupsen/logrus" + + iter "github.com/vulcanize/go-eth-state-node-iterator" +) + +type trackedIter struct { + trie.NodeIterator + tracker *iteratorTracker +} + +func (it *trackedIter) Next(descend bool) bool { + ret := it.NodeIterator.Next(descend) + if !ret { + it.tracker.stopChan <- it + } + return ret +} + +type iteratorTracker struct { + recoveryFile string + + startChan chan *trackedIter + stopChan chan *trackedIter + started map[*trackedIter]struct{} + stopped []*trackedIter + + haltChan chan struct{} + done chan struct{} +} + +func newTracker(file string, buf int) iteratorTracker { + return iteratorTracker{ + recoveryFile: file, + startChan: make(chan *trackedIter, buf), + stopChan: make(chan *trackedIter, buf), + started: map[*trackedIter]struct{}{}, + haltChan: make(chan struct{}), + done: make(chan struct{}), + } +} + +func (tr *iteratorTracker) captureSignal() { + sigChan := make(chan os.Signal, 1) + + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigChan + log.Errorf("Signal received (%v), stopping", sig) + tr.haltAndDump() + os.Exit(1) + }() +} + +// listens for starts/stops and manages current state +func (tr *iteratorTracker) run() { +loop: + for { + select { + case start := <-tr.startChan: + tr.started[start] = struct{}{} + case stop := <-tr.stopChan: + tr.stopped = append(tr.stopped, stop) + case <-tr.haltChan: + break loop + default: + } + } + tr.done <- struct{}{} +} + +func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { + ret = &trackedIter{it, tr} + tr.startChan <- ret + return +} + +// dumps iterator path and bounds to a text file so it can be restored later +func (tr *iteratorTracker) dump() error { + log.Info("Dumping recovery state to:", tr.recoveryFile) + var rows [][]string + for it, _ := range tr.started { + var endPath []byte + if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok { + endPath = impl.EndPath + } + rows = append(rows, []string{ + fmt.Sprintf("%x", it.Path()), + fmt.Sprintf("%x", endPath), + }) + } + file, err := os.Create(tr.recoveryFile) + if err != nil { + return err + } + defer file.Close() + out := csv.NewWriter(file) + return out.WriteAll(rows) +} + +// attempts to read iterator state from file +// if file doesn't exist, returns an empty slice with no error +func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) { + file, err := os.Open(tr.recoveryFile) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + defer file.Close() + in := csv.NewReader(file) + in.FieldsPerRecord = 2 + rows, err := in.ReadAll() + if err != nil { + return nil, err + } + var ret []trie.NodeIterator + for _, row := range rows { + // pick up where each interval left off + var paths [2][]byte + for i, val := range row { + if len(val) != 0 { + if _, err = fmt.Sscanf(val, "%x", &paths[i]); err != nil { + return nil, err + } + } + } + + it := iter.NewPrefixBoundIterator(tree, paths[0], paths[1]) + ret = append(ret, tr.tracked(it)) + } + return ret, nil +} + +func (tr *iteratorTracker) haltAndDump() error { + tr.haltChan <- struct{}{} + <-tr.done + + // drain any pending events + close(tr.startChan) + for start := range tr.startChan { + tr.started[start] = struct{}{} + } + close(tr.stopChan) + for stop := range tr.stopChan { + tr.stopped = append(tr.stopped, stop) + } + + for _, stop := range tr.stopped { + delete(tr.started, stop) + } + + if len(tr.started) == 0 { + // if the tracker state is empty, erase any existing recovery file + err := os.Remove(tr.recoveryFile) + if os.IsNotExist(err) { + err = nil + } + return err + } + return tr.dump() +} diff --git a/test/helper.go b/test/helper.go index c5955e4..121b317 100644 --- a/test/helper.go +++ b/test/helper.go @@ -1,6 +1,7 @@ package test import ( + "bytes" "os" "reflect" "testing" @@ -45,8 +46,14 @@ func NoError(t *testing.T, err error) { } // ExpectEqual asserts the provided interfaces are deep equal -func ExpectEqual(t *testing.T, want interface{}, got interface{}) { +func ExpectEqual(t *testing.T, want, got interface{}) { if !reflect.DeepEqual(want, got) { - t.Fatalf("Expected: %v\nActual: %v", want, got) + t.Fatalf("Values not equal:\nExpected:\t%v\nActual:\t\t%v", want, got) + } +} + +func ExpectEqualBytes(t *testing.T, want, got []byte) { + if !bytes.Equal(want, got) { + t.Fatalf("Bytes not equal:\nExpected:\t%v\nActual:\t\t%v", want, got) } }