diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f5afb82 --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +MOCKS_DIR = $(CURDIR)/mocks +mockgen_cmd=mockgen + +.PHONY: mocks + +mocks: mocks/snapshot/publisher.go + +mocks/snapshot/publisher.go: pkg/types/publisher.go + $(mockgen_cmd) -package snapshot_mock -destination $@ -source $< Publisher + +clean: + rm -f mocks/snapshot/publisher.go diff --git a/README.md b/README.md index e5f1b84..bc087a6 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,10 @@ Config format: ```toml [database] - name = "vulcanize_public" - hostname = "localhost" - port = 5432 - user = "postgres" + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "postgres" [leveldb] path = "/Users/user/Library/Ethereum/geth/chaindata" @@ -23,4 +23,4 @@ Config format: [snapshot] blockHeight = 0 -``` \ No newline at end of file +``` diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index 5eef0a9..ba910f3 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -40,18 +40,28 @@ var stateSnapshotCmd = &cobra.Command{ func stateSnapshot() { snapConfig := &snapshot.Config{} snapConfig.Init() - snapshotService, err := snapshot.NewSnapshotService(snapConfig) + pgDB, err := snapshot.NewPostgresDB(snapConfig.DB) + if err != nil { + logWithCommand.Fatal(err) + } + edb, err := snapshot.NewLevelDB(snapConfig.Eth) + if err != nil { + logWithCommand.Fatal(err) + } + + snapshotService, err := snapshot.NewSnapshotService(edb, snapshot.NewPublisher(pgDB)) if err != nil { logWithCommand.Fatal(err) } height := viper.GetInt64("snapshot.blockHeight") + workers := viper.GetUint("snapshot.workers") if height < 0 { - if err := snapshotService.CreateLatestSnapshot(); err != nil { + if err := snapshotService.CreateLatestSnapshot(workers); err != nil { logWithCommand.Fatal(err) } } else { - uHeight := uint64(height) - if err := snapshotService.CreateSnapshot(uHeight); err != nil { + params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height)} + if err := snapshotService.CreateSnapshot(params); err != nil { logWithCommand.Fatal(err) } } @@ -64,8 +74,10 @@ func init() { stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore") stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore") stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at") + stateSnapshotCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use") viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path")) viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path")) viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height")) + viper.BindPFlag("snapshot.workers", stateSnapshotCmd.PersistentFlags().Lookup("workers")) } diff --git a/fixture/chaindata/000002.ldb b/fixture/chaindata/000002.ldb new file mode 100644 index 0000000..8e5331a Binary files /dev/null and b/fixture/chaindata/000002.ldb differ diff --git a/fixture/chaindata/ancient/bodies.0000.cdat b/fixture/chaindata/ancient/bodies.0000.cdat new file mode 100644 index 0000000..838476f Binary files /dev/null and b/fixture/chaindata/ancient/bodies.0000.cdat differ diff --git a/fixture/chaindata/ancient/bodies.cidx b/fixture/chaindata/ancient/bodies.cidx new file mode 100644 index 0000000..36aa86b Binary files /dev/null and b/fixture/chaindata/ancient/bodies.cidx differ diff --git a/fixture/chaindata/ancient/diffs.0000.rdat b/fixture/chaindata/ancient/diffs.0000.rdat new file mode 100644 index 0000000..465f590 Binary files /dev/null and b/fixture/chaindata/ancient/diffs.0000.rdat differ diff --git a/fixture/chaindata/ancient/diffs.ridx b/fixture/chaindata/ancient/diffs.ridx new file mode 100644 index 0000000..3519945 Binary files /dev/null and b/fixture/chaindata/ancient/diffs.ridx differ diff --git a/fixture/chaindata/ancient/hashes.0000.rdat b/fixture/chaindata/ancient/hashes.0000.rdat new file mode 100644 index 0000000..f865e0f Binary files /dev/null and b/fixture/chaindata/ancient/hashes.0000.rdat differ diff --git a/fixture/chaindata/ancient/hashes.ridx b/fixture/chaindata/ancient/hashes.ridx new file mode 100644 index 0000000..91c8489 Binary files /dev/null and b/fixture/chaindata/ancient/hashes.ridx differ diff --git a/fixture/chaindata/ancient/headers.0000.cdat b/fixture/chaindata/ancient/headers.0000.cdat new file mode 100644 index 0000000..3e1c6d2 Binary files /dev/null and b/fixture/chaindata/ancient/headers.0000.cdat differ diff --git a/fixture/chaindata/ancient/headers.cidx b/fixture/chaindata/ancient/headers.cidx new file mode 100644 index 0000000..2328522 Binary files /dev/null and b/fixture/chaindata/ancient/headers.cidx differ diff --git a/fixture/chaindata/ancient/receipts.0000.cdat b/fixture/chaindata/ancient/receipts.0000.cdat new file mode 100644 index 0000000..808a254 Binary files /dev/null and b/fixture/chaindata/ancient/receipts.0000.cdat differ diff --git a/fixture/chaindata/ancient/receipts.cidx b/fixture/chaindata/ancient/receipts.cidx new file mode 100644 index 0000000..4683e01 Binary files /dev/null and b/fixture/chaindata/ancient/receipts.cidx differ diff --git a/fixture/service.go b/fixture/service.go new file mode 100644 index 0000000..812e2ff --- /dev/null +++ b/fixture/service.go @@ -0,0 +1,37 @@ +package fixture + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + // "github.com/jmoiron/sqlx" + + snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" +) + +var PublishHeader = &types.Header{ + ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"), + UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"), + Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"), + Root: common.HexToHash("0x53580584816f617295ea26c0e17641e0120cab2f0a8ffb53a866fd53aa8e8c2d"), + TxHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), + ReceiptHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), + Bloom: types.Bloom{}, + Difficulty: big.NewInt(+2), + Number: big.NewInt(+1), + GasLimit: 4704588, + GasUsed: 0, + Time: 1492010458, + Extra: []byte{215, 131, 1, 6, 0, 132, 103, 101, 116, 104, 135, 103, 111, 49, 46, 55, 46, 51, 133, 108, 105, 110, 117, 120, 0, 0, 0, 0, 0, 0, 0, 0, 159, 30, 250, 30, 250, 114, 175, 19, 140, 145, 89, 102, 198, 57, 84, 74, 2, 85, 230, 40, 142, 24, 140, 34, 206, 145, 104, 193, 13, 190, 70, 218, 61, 136, 180, 170, 6, 89, 48, 17, 159, 184, 134, 33, 11, 240, 26, 8, 79, 222, 93, 59, 196, 141, 138, 163, 139, 202, 146, 228, 252, 197, 33, 81, 0}, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, +} + +var StateNode = snapt.Node{ + NodeType: 0, + Path: []byte{12, 0}, + Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), + Value: []byte{248, 113, 160, 147, 141, 92, 6, 119, 63, 191, 125, 121, 193, 230, 153, 223, 49, 102, 109, 236, 50, 44, 161, 215, 28, 224, 171, 111, 118, 230, 79, 99, 18, 99, 4, 160, 117, 126, 95, 187, 60, 115, 90, 36, 51, 167, 59, 86, 20, 175, 63, 118, 94, 230, 107, 202, 41, 253, 234, 165, 214, 221, 181, 45, 9, 202, 244, 148, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 160, 247, 170, 155, 102, 71, 245, 140, 90, 255, 89, 193, 131, 99, 31, 85, 161, 78, 90, 0, 204, 46, 253, 15, 71, 120, 19, 109, 123, 255, 0, 188, 27, 128}, +} diff --git a/go.mod b/go.mod index 8c5c001..83ee9da 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,10 @@ go 1.15 require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/ethereum/go-ethereum v1.9.14 + github.com/ethereum/go-ethereum v1.10.15 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-kit/kit v0.10.0 // indirect + github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.5.6 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect @@ -26,6 +27,7 @@ require ( github.com/smartystreets/assertions v1.0.0 // indirect github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 + github.com/vulcanize/go-eth-state-node-iterator v1.0.0 go.uber.org/atomic v1.9.0 // indirect go.uber.org/goleak v1.1.11 // indirect go.uber.org/multierr v1.7.0 // indirect @@ -40,4 +42,4 @@ require ( lukechampine.com/blake3 v1.1.7 // indirect ) -replace github.com/ethereum/go-ethereum v1.9.14 => github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 +replace github.com/ethereum/go-ethereum v1.10.15 => github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0 diff --git a/go.sum b/go.sum index bb1e5f8..4ff9acd 100644 --- a/go.sum +++ b/go.sum @@ -213,6 +213,8 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -671,8 +673,10 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 h1:kjZjteD/6vh9DcixPkrg27XtxKW7ZoV5++1EYAi6FAw= -github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= +github.com/vulcanize/go-eth-state-node-iterator v1.0.0 h1:FQ4s0K5TnRD44p3vO4uQtjfE8C1Gr2EDMT2vVGS3Lu8= +github.com/vulcanize/go-eth-state-node-iterator v1.0.0/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo= +github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0 h1:/BiYPUHnubh46YVtASGt4MPlFR96Rc+iJuTyOI8KZa4= +github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -926,6 +930,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8 h1:P1HhGGuLW4aAclzjtmJdf0mJOjVUZUzOTqkAkWL+l6w= diff --git a/pkg/snapshot/config.go b/pkg/snapshot/config.go index 532d14d..c8b00d2 100644 --- a/pkg/snapshot/config.go +++ b/pkg/snapshot/config.go @@ -30,37 +30,48 @@ const ( LVL_DB_PATH = "LVL_DB_PATH" ) -// Config is config parameters for DB. -type Config struct { +// DBConfig is config parameters for DB. +type DBConfig struct { + Node ethNode.Info + URI string + ConnConfig postgres.ConnectionConfig +} + +// EthConfig is config parameters for the chain. +type EthConfig struct { LevelDBPath string AncientDBPath string - Node ethNode.Info - connectionURI string - DBConfig postgres.ConnectionConfig +} + +type Config struct { + DB *DBConfig + Eth *EthConfig } // Init Initialises config func (c *Config) Init() { - c.dbInit() - viper.BindEnv("leveldb.path", LVL_DB_PATH) + c.DB.dbInit() viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) - viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH) - c.Node = ethNode.Info{ + c.DB.Node = ethNode.Info{ ID: viper.GetString("ethereum.nodeID"), ClientName: viper.GetString("ethereum.clientName"), GenesisBlock: viper.GetString("ethereum.genesisBlock"), NetworkID: viper.GetString("ethereum.networkID"), ChainID: viper.GetUint64("ethereum.chainID"), } - c.LevelDBPath = viper.GetString("leveldb.path") - c.AncientDBPath = viper.GetString("leveldb.ancient") + + viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH) + viper.BindEnv("leveldb.path", LVL_DB_PATH) + + c.Eth.AncientDBPath = viper.GetString("leveldb.ancient") + c.Eth.LevelDBPath = viper.GetString("leveldb.path") } -func (c *Config) dbInit() { +func (c *DBConfig) dbInit() { viper.BindEnv("database.name", postgres.DATABASE_NAME) viper.BindEnv("database.hostname", postgres.DATABASE_HOSTNAME) viper.BindEnv("database.port", postgres.DATABASE_PORT) @@ -78,9 +89,9 @@ func (c *Config) dbInit() { dbParams.User = viper.GetString("database.user") dbParams.Password = viper.GetString("database.password") - c.connectionURI = postgres.DbConnectionString(dbParams) - // DB config - c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle") - c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen") - c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime") + c.URI = postgres.DbConnectionString(dbParams) + // Connection config + c.ConnConfig.MaxIdle = viper.GetInt("database.maxIdle") + c.ConnConfig.MaxOpen = viper.GetInt("database.maxOpen") + c.ConnConfig.MaxLifetime = viper.GetInt("database.maxLifetime") } diff --git a/pkg/snapshot/mock/publisher.go b/pkg/snapshot/mock/publisher.go new file mode 100644 index 0000000..bd2bd56 --- /dev/null +++ b/pkg/snapshot/mock/publisher.go @@ -0,0 +1,64 @@ +package mock + +import ( + "fmt" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jmoiron/sqlx" + + "github.com/golang/mock/gomock" + + mocks "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot" + snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" +) + +type MockPublisher struct { + *mocks.MockPublisher +} + +func NewMockPublisher(t *testing.T) *MockPublisher { + ctl := gomock.NewController(t) + return &MockPublisher{mocks.NewMockPublisher(ctl)} +} + +func dump(funcname string, xs ...interface{}) { + if true { + return + } + fmt.Printf(">> %s", funcname) + fmt.Printf(strings.Repeat(" %+v", len(xs))+"\n", xs...) +} + +func (p *MockPublisher) PublishHeader(header *types.Header) (int64, error) { + // fmt.Printf("PublishHeader %+v\n", header) + dump("PublishHeader", header) + return p.MockPublisher.PublishHeader(header) +} +func (p *MockPublisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) { + dump("PublishStateNode", node, headerID) + return p.MockPublisher.PublishStateNode(node, headerID, tx) + +} +func (p *MockPublisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error { + dump("PublishStorageNode", node, stateID) + return p.MockPublisher.PublishStorageNode(node, stateID, tx) +} +func (p *MockPublisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error { + dump("PublishCode", codeHash, codeBytes) + return p.MockPublisher.PublishCode(codeHash, codeBytes, tx) +} +func (p *MockPublisher) BeginTx() (*sqlx.Tx, error) { + dump("BeginTx") + return p.MockPublisher.BeginTx() +} +func (p *MockPublisher) CommitTx(tx *sqlx.Tx) error { + dump("CommitTx", tx) + return p.MockPublisher.CommitTx(tx) +} +func (p *MockPublisher) PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error) { + dump("PrepareTxForBatch", tx, batchSize) + return p.MockPublisher.PrepareTxForBatch(tx, batchSize) +} diff --git a/pkg/snapshot/mock/util.go b/pkg/snapshot/mock/util.go new file mode 100644 index 0000000..99106ed --- /dev/null +++ b/pkg/snapshot/mock/util.go @@ -0,0 +1,23 @@ +package mock + +import "fmt" +import "github.com/golang/mock/gomock" + +type anyOfMatcher struct { + values []interface{} +} + +func (m anyOfMatcher) Matches(x interface{}) bool { + for _, v := range m.values { + if gomock.Eq(v).Matches(x) { + return true + } + } + return false +} +func (m anyOfMatcher) String() string { + return fmt.Sprintf("is equal to any of %+v", m.values) +} +func AnyOf(xs ...interface{}) anyOfMatcher { + return anyOfMatcher{xs} +} diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index 781cea4..5cd2e42 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -30,12 +30,13 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/shared" + snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" ) const logInterval = 1 * time.Minute // Publisher is wrapper around DB. -type Publisher struct { +type publisher struct { db *postgres.DB currBatchSize uint stateNodeCounter uint64 @@ -45,16 +46,31 @@ type Publisher struct { } // NewPublisher creates Publisher -func NewPublisher(db *postgres.DB) *Publisher { - return &Publisher{ +func NewPublisher(db *postgres.DB) *publisher { + return &publisher{ db: db, currBatchSize: 0, startTime: time.Now(), } } +func (p *publisher) BeginTx() (*sqlx.Tx, error) { + tx, err := p.db.Beginx() + if err != nil { + return nil, err + } + go p.logNodeCounters() + return tx, nil +} + +func (p *publisher) CommitTx(tx *sqlx.Tx) error { + logrus.Info("----- final counts -----") + p.printNodeCounters() + return tx.Commit() +} + // PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table -func (p *Publisher) PublishHeader(header *types.Header) (int64, error) { +func (p *publisher) PublishHeader(header *types.Header) (int64, error) { headerNode, err := ipld.NewEthHeader(header) if err != nil { return 0, err @@ -93,14 +109,14 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) { } // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table -func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (int64, error) { +func (p *publisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) { var stateID int64 var stateKey string - if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) { - stateKey = node.key.Hex() + if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { + stateKey = node.Key.Hex() } - stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.value) + stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value) if err != nil { return 0, err } @@ -108,7 +124,7 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, - headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID) + headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID) // increment state node counter. atomic.AddUint64(&p.stateNodeCounter, 1) @@ -119,20 +135,20 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i } // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table -func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) error { +func (p *publisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error { var storageKey string - if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) { - storageKey = node.key.Hex() + if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { + storageKey = node.Key.Hex() } - storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.value) + storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value) if err != nil { return err } _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, - stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey) + stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) if err != nil { return err } @@ -146,7 +162,7 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e } // PublishCode writes code to the ipfs backing pg datastore -func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error { +func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error { // no codec for code, doesn't matter though since blockstore key is multihash-derived mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) if err != nil { @@ -164,7 +180,7 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx return nil } -func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) { +func (p *publisher) PrepareTxForBatch(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) { var err error // maximum batch size reached, commit the current transaction and begin a new transaction. if maxBatchSize <= p.currBatchSize { @@ -184,14 +200,14 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er } // logNodeCounters periodically logs the number of node processed. -func (p *Publisher) logNodeCounters() { +func (p *publisher) logNodeCounters() { t := time.NewTicker(logInterval) for range t.C { p.printNodeCounters() } } -func (p *Publisher) printNodeCounters() { +func (p *publisher) printNodeCounters() { logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index a684741..3838438 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -19,6 +19,7 @@ import ( "bytes" "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -32,6 +33,9 @@ import ( "github.com/ethereum/go-ethereum/trie" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" + + . "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" + iter "github.com/vulcanize/go-eth-state-node-iterator" ) var ( @@ -48,166 +52,162 @@ var ( type Service struct { ethDB ethdb.Database stateDB state.Database - ipfsPublisher *Publisher + ipfsPublisher Publisher maxBatchSize uint } +func NewPostgresDB(con *DBConfig) (*postgres.DB, error) { + pgDB, err := postgres.NewDB(con.URI, con.ConnConfig, con.Node) + if err != nil { + return nil, err + } + return pgDB, nil +} + +func NewLevelDB(con *EthConfig) (ethdb.Database, error) { + return rawdb.NewLevelDBDatabaseWithFreezer( + con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false, + ) +} + // NewSnapshotService creates Service. -func NewSnapshotService(con *Config) (*Service, error) { - pgDB, err := postgres.NewDB(con.connectionURI, con.DBConfig, con.Node) - if err != nil { - return nil, err - } - - edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false) - if err != nil { - return nil, err - } - +func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) { return &Service{ ethDB: edb, stateDB: state.NewDatabase(edb), - ipfsPublisher: NewPublisher(pgDB), + ipfsPublisher: pub, maxBatchSize: defaultBatchSize, }, nil } -// CreateLatestSnapshot creates snapshot for the latest block. -func (s *Service) CreateLatestSnapshot() error { +type SnapshotParams struct { + Height uint64 + Workers uint +} + +func (s *Service) CreateSnapshot(params SnapshotParams) error { // extract header from lvldb and publish to PG-IPFS // hold onto the headerID so that we can link the state nodes to this header - logrus.Info("Creating snapshot at head") + logrus.Infof("Creating snapshot at height %d", params.Height) + hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height) + header := rawdb.ReadHeader(s.ethDB, hash, params.Height) + if header == nil { + return fmt.Errorf("unable to read canonical header at height %d", params.Height) + } + logrus.Infof("head hash: %s head height: %d", hash.Hex(), params.Height) + + headerID, err := s.ipfsPublisher.PublishHeader(header) + if err != nil { + return err + } + + t, err := s.stateDB.OpenTrie(header.Root) + if err != nil { + return err + } + if params.Workers > 0 { + return s.createSnapshotAsync(t, headerID, params.Workers) + } else { + return s.createSnapshot(t.NodeIterator(nil), headerID) + } + return nil +} + +// Create snapshot up to head (ignores height param) +func (s *Service) CreateLatestSnapshot(workers uint) error { + logrus.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) height := rawdb.ReadHeaderNumber(s.ethDB, hash) if height == nil { return fmt.Errorf("unable to read header height for header hash %s", hash.String()) } - - header := rawdb.ReadHeader(s.ethDB, hash, *height) - if header == nil { - return fmt.Errorf("unable to read canonical header at height %d", height) - } - - logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height) - - headerID, err := s.ipfsPublisher.PublishHeader(header) - if err != nil { - return err - } - - t, err := s.stateDB.OpenTrie(header.Root) - if err != nil { - return err - } - - trieDB := s.stateDB.TrieDB() - return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) + return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers}) } -// CreateSnapshot creates snapshot for given block height. -func (s *Service) CreateSnapshot(height uint64) error { - // extract header from lvldb and publish to PG-IPFS - // hold onto the headerID so that we can link the state nodes to this header - logrus.Infof("Creating snapshot at height %d", height) - hash := rawdb.ReadCanonicalHash(s.ethDB, height) - header := rawdb.ReadHeader(s.ethDB, hash, height) - if header == nil { - return fmt.Errorf("unable to read canonical header at height %d", height) - } - - headerID, err := s.ipfsPublisher.PublishHeader(header) - if err != nil { - return err - } - - t, err := s.stateDB.OpenTrie(header.Root) - if err != nil { - return err - } - - trieDB := s.stateDB.TrieDB() - return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) +type nodeResult struct { + node Node + elements []interface{} } -func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { - tx, err := s.ipfsPublisher.db.Beginx() +func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { + // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves + if it.Leaf() { + return nil, nil + } + if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { + return nil, nil + } + + path := make([]byte, len(it.Path())) + copy(path, it.Path()) + n, err := trieDB.Node(it.Hash()) + if err != nil { + return nil, err + } + var elements []interface{} + if err := rlp.DecodeBytes(n, &elements); err != nil { + return nil, err + } + ty, err := CheckKeyType(elements) + if err != nil { + return nil, err + } + return &nodeResult{ + node: Node{ + NodeType: ty, + Path: path, + Value: n, + }, + elements: elements, + }, nil +} + +func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { + tx, err := s.ipfsPublisher.BeginTx() if err != nil { return err } - go s.ipfsPublisher.logNodeCounters() defer func() { - logrus.Info("----- final counts -----") - s.ipfsPublisher.printNodeCounters() if rec := recover(); rec != nil { shared.Rollback(tx) panic(rec) } else if err != nil { shared.Rollback(tx) } else { - err = tx.Commit() + err = s.ipfsPublisher.CommitTx(tx) } }() for it.Next(true) { - if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves + res, err := resolveNode(it, s.stateDB.TrieDB()) + if err != nil { + return err + } + if res == nil { continue } - if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { - continue - } - - tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize) + tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) if err != nil { return err } - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) - - var ( - nodeData []byte - ty nodeType - ) - - nodeData, err = trieDB.Node(it.Hash()) - if err != nil { - return err - } - - var nodeElements []interface{} - if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil { - return err - } - - ty, err = CheckKeyType(nodeElements) - if err != nil { - return err - } - - stateNode := &node{ - nodeType: ty, - path: nodePath, - value: nodeData, - } - - switch ty { - case leaf: + switch res.node.NodeType { + case Leaf: // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any var account types.StateAccount - if err = rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { - return fmt.Errorf("error decoding account for leaf node at path %x nerror: %w", nodePath, err) + if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { + return fmt.Errorf( + "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) } - - partialPath := trie.CompactToHex(nodeElements[0].([]byte)) - valueNodePath := append(nodePath, partialPath...) + partialPath := trie.CompactToHex(res.elements[0].([]byte)) + valueNodePath := append(res.node.Path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] - stateNode.key = common.BytesToHash(leafKey) - - stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx) + res.node.Key = common.BytesToHash(leafKey) + stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx) if err != nil { return err } @@ -229,18 +229,44 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil { return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) } - case extension, branch: - stateNode.key = common.BytesToHash([]byte{}) - if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil { + case Extension, Branch: + res.node.Key = common.BytesToHash([]byte{}) + if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil { return err } default: return errors.New("unexpected node type") } + return nil } return it.Error() } +// Full-trie concurrent snapshot +func (s *Service) createSnapshotAsync(tree state.Trie, headerID int64, workers uint) error { + errors := make(chan error) + var wg sync.WaitGroup + for _, it := range iter.SubtrieIterators(tree, workers) { + wg.Add(1) + go func() { + defer wg.Done() + if err := s.createSnapshot(it, headerID); err != nil { + errors <- err + } + }() + } + go func() { + defer close(errors) + wg.Wait() + }() + + select { + case err := <-errors: + return err + } + return nil +} + func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*sqlx.Tx, error) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return tx, nil @@ -253,63 +279,39 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (* it := sTrie.NodeIterator(make([]byte, 0)) for it.Next(true) { - // skip value nodes - if it.Leaf() { + res, err := resolveNode(it, s.stateDB.TrieDB()) + if err != nil { + return nil, err + } + if res == nil { continue } - if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { - continue - } - - tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize) + tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) if err != nil { return nil, err } - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) - - var ( - nodeData []byte - ty nodeType - ) - + var nodeData []byte nodeData, err = s.stateDB.TrieDB().Node(it.Hash()) if err != nil { return nil, err } + res.node.Value = nodeData - var nodeElements []interface{} - if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil { - return nil, err - } - - ty, err = CheckKeyType(nodeElements) - if err != nil { - return nil, err - } - - storageNode := &node{ - nodeType: ty, - path: nodePath, - value: nodeData, - } - - switch ty { - case leaf: - partialPath := trie.CompactToHex(nodeElements[0].([]byte)) - valueNodePath := append(nodePath, partialPath...) + switch res.node.NodeType { + case Leaf: + partialPath := trie.CompactToHex(res.elements[0].([]byte)) + valueNodePath := append(res.node.Path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] - storageNode.key = common.BytesToHash(leafKey) - case extension, branch: - storageNode.key = common.BytesToHash([]byte{}) + res.node.Key = common.BytesToHash(leafKey) + case Extension, Branch: + res.node.Key = common.BytesToHash([]byte{}) default: return nil, errors.New("unexpected node type") } - - if err = s.ipfsPublisher.PublishStorageNode(storageNode, stateID, tx); err != nil { + if err = s.ipfsPublisher.PublishStorageNode(&res.node, stateID, tx); err != nil { return nil, err } } diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go new file mode 100644 index 0000000..8fbac83 --- /dev/null +++ b/pkg/snapshot/service_test.go @@ -0,0 +1,99 @@ +package snapshot + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/golang/mock/gomock" + + ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + + fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture" + "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot/mock" +) + +func testConfig(leveldbpath, ancientdbpath string) *Config { + dbParams := postgres.ConnectionParams{ + Name: "snapshot_test", + Hostname: "localhost", + Port: 5432, + User: "tester", + Password: "test_pw", + } + connconfig := postgres.ConnectionConfig{ + MaxIdle: 0, + MaxLifetime: 0, + MaxOpen: 4, + } + nodeinfo := ethNode.Info{ + ID: "eth_node_id", + ClientName: "eth_client", + GenesisBlock: "X", + NetworkID: "eth_network", + ChainID: 0, + } + + return &Config{ + DB: &DBConfig{ + Node: nodeinfo, + URI: postgres.DbConnectionString(dbParams), + ConnConfig: connconfig, + }, + Eth: &EthConfig{ + LevelDBPath: leveldbpath, + AncientDBPath: ancientdbpath, + }, + } +} + +func TestCreateSnapshot(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + panic(err) + } + fixture_path := filepath.Join(wd, "..", "..", "fixture") + datadir := filepath.Join(fixture_path, "chaindata") + if _, err := os.Stat(datadir); err != nil { + t.Fatal("no chaindata:", err) + } + config := testConfig(datadir, filepath.Join(datadir, "ancient")) + fmt.Printf("config: %+v %+v\n", config.DB, config.Eth) + + edb, err := NewLevelDB(config.Eth) + if err != nil { + t.Fatal(err) + } + workers := 8 + + pub := mock.NewMockPublisher(t) + pub.EXPECT().PublishHeader(gomock.Eq(fixt.PublishHeader)) + pub.EXPECT().BeginTx(). + Times(workers) + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()). + Times(workers) + pub.EXPECT().PublishStateNode(gomock.Any(), mock.AnyOf(int64(0), int64(1)), gomock.Any()). + Times(workers) + // TODO: fixtures for storage node + // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) + pub.EXPECT().CommitTx(gomock.Any()). + Times(workers) + + service, err := NewSnapshotService(edb, pub) + if err != nil { + t.Fatal(err) + } + + params := SnapshotParams{Height: 1, Workers: uint(workers)} + err = service.CreateSnapshot(params) + if err != nil { + t.Fatal(err) + } + + // err = service.CreateLatestSnapshot(0) + // if err != nil { + // t.Fatal(err) + // } +} diff --git a/pkg/snapshot/node_type.go b/pkg/types/node_type.go similarity index 75% rename from pkg/snapshot/node_type.go rename to pkg/types/node_type.go index 2ff9374..f33dcd0 100644 --- a/pkg/snapshot/node_type.go +++ b/pkg/types/node_type.go @@ -13,7 +13,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package snapshot +package types import ( "fmt" @@ -23,41 +23,42 @@ import ( // node for holding trie node information type node struct { - nodeType nodeType - path []byte - key common.Hash - value []byte + NodeType nodeType + Path []byte + Key common.Hash + Value []byte } +type Node = node // nodeType for explicitly setting type of node type nodeType int const ( - branch nodeType = iota - extension - leaf - removed - unknown + Branch nodeType = iota + Extension + Leaf + Removed + Unknown ) // CheckKeyType checks what type of key we have func CheckKeyType(elements []interface{}) (nodeType, error) { if len(elements) > 2 { - return branch, nil + return Branch, nil } if len(elements) < 2 { - return unknown, fmt.Errorf("node cannot be less than two elements in length") + return Unknown, fmt.Errorf("node cannot be less than two elements in length") } switch elements[0].([]byte)[0] / 16 { case '\x00': - return extension, nil + return Extension, nil case '\x01': - return extension, nil + return Extension, nil case '\x02': - return leaf, nil + return Leaf, nil case '\x03': - return leaf, nil + return Leaf, nil default: - return unknown, fmt.Errorf("unknown hex prefix") + return Unknown, fmt.Errorf("unknown hex prefix") } } diff --git a/pkg/types/publisher.go b/pkg/types/publisher.go new file mode 100644 index 0000000..8e6ece8 --- /dev/null +++ b/pkg/types/publisher.go @@ -0,0 +1,17 @@ +package types + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jmoiron/sqlx" +) + +type Publisher interface { + PublishHeader(header *types.Header) (int64, error) + PublishStateNode(node *Node, headerID int64, tx *sqlx.Tx) (int64, error) + PublishStorageNode(node *Node, stateID int64, tx *sqlx.Tx) error + PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error + BeginTx() (*sqlx.Tx, error) + CommitTx(*sqlx.Tx) error + PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error) +}