Merge pull request #7 from vulcanize/async-traversal

Concurrent traversal
This commit is contained in:
Ian Norden 2022-01-26 12:09:21 -06:00 committed by GitHub
commit 5269b7989c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 519 additions and 218 deletions

12
Makefile Normal file
View File

@ -0,0 +1,12 @@
MOCKS_DIR = $(CURDIR)/mocks
mockgen_cmd=mockgen
.PHONY: mocks
mocks: mocks/snapshot/publisher.go
mocks/snapshot/publisher.go: pkg/types/publisher.go
$(mockgen_cmd) -package snapshot_mock -destination $@ -source $< Publisher
clean:
rm -f mocks/snapshot/publisher.go

View File

@ -40,18 +40,28 @@ var stateSnapshotCmd = &cobra.Command{
func stateSnapshot() {
snapConfig := &snapshot.Config{}
snapConfig.Init()
snapshotService, err := snapshot.NewSnapshotService(snapConfig)
pgDB, err := snapshot.NewPostgresDB(snapConfig.DB)
if err != nil {
logWithCommand.Fatal(err)
}
edb, err := snapshot.NewLevelDB(snapConfig.Eth)
if err != nil {
logWithCommand.Fatal(err)
}
snapshotService, err := snapshot.NewSnapshotService(edb, snapshot.NewPublisher(pgDB))
if err != nil {
logWithCommand.Fatal(err)
}
height := viper.GetInt64("snapshot.blockHeight")
workers := viper.GetUint("snapshot.workers")
if height < 0 {
if err := snapshotService.CreateLatestSnapshot(); err != nil {
if err := snapshotService.CreateLatestSnapshot(workers); err != nil {
logWithCommand.Fatal(err)
}
} else {
uHeight := uint64(height)
if err := snapshotService.CreateSnapshot(uHeight); err != nil {
params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height)}
if err := snapshotService.CreateSnapshot(params); err != nil {
logWithCommand.Fatal(err)
}
}
@ -64,8 +74,10 @@ func init() {
stateSnapshotCmd.PersistentFlags().String("leveldb-path", "", "path to primary datastore")
stateSnapshotCmd.PersistentFlags().String("ancient-path", "", "path to ancient datastore")
stateSnapshotCmd.PersistentFlags().String("block-height", "", "blockheight to extract state at")
stateSnapshotCmd.PersistentFlags().Int("workers", 0, "number of concurrent workers to use")
viper.BindPFlag("leveldb.path", stateSnapshotCmd.PersistentFlags().Lookup("leveldb-path"))
viper.BindPFlag("leveldb.ancient", stateSnapshotCmd.PersistentFlags().Lookup("ancient-path"))
viper.BindPFlag("snapshot.blockHeight", stateSnapshotCmd.PersistentFlags().Lookup("block-height"))
viper.BindPFlag("snapshot.workers", stateSnapshotCmd.PersistentFlags().Lookup("workers"))
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

37
fixture/service.go Normal file
View File

@ -0,0 +1,37 @@
package fixture
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
// "github.com/jmoiron/sqlx"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
)
var PublishHeader = &types.Header{
ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"),
UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"),
Root: common.HexToHash("0x53580584816f617295ea26c0e17641e0120cab2f0a8ffb53a866fd53aa8e8c2d"),
TxHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"),
ReceiptHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"),
Bloom: types.Bloom{},
Difficulty: big.NewInt(+2),
Number: big.NewInt(+1),
GasLimit: 4704588,
GasUsed: 0,
Time: 1492010458,
Extra: []byte{215, 131, 1, 6, 0, 132, 103, 101, 116, 104, 135, 103, 111, 49, 46, 55, 46, 51, 133, 108, 105, 110, 117, 120, 0, 0, 0, 0, 0, 0, 0, 0, 159, 30, 250, 30, 250, 114, 175, 19, 140, 145, 89, 102, 198, 57, 84, 74, 2, 85, 230, 40, 142, 24, 140, 34, 206, 145, 104, 193, 13, 190, 70, 218, 61, 136, 180, 170, 6, 89, 48, 17, 159, 184, 134, 33, 11, 240, 26, 8, 79, 222, 93, 59, 196, 141, 138, 163, 139, 202, 146, 228, 252, 197, 33, 81, 0},
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
}
var StateNode = snapt.Node{
NodeType: 0,
Path: []byte{12, 0},
Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"),
Value: []byte{248, 113, 160, 147, 141, 92, 6, 119, 63, 191, 125, 121, 193, 230, 153, 223, 49, 102, 109, 236, 50, 44, 161, 215, 28, 224, 171, 111, 118, 230, 79, 99, 18, 99, 4, 160, 117, 126, 95, 187, 60, 115, 90, 36, 51, 167, 59, 86, 20, 175, 63, 118, 94, 230, 107, 202, 41, 253, 234, 165, 214, 221, 181, 45, 9, 202, 244, 148, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 160, 247, 170, 155, 102, 71, 245, 140, 90, 255, 89, 193, 131, 99, 31, 85, 161, 78, 90, 0, 204, 46, 253, 15, 71, 120, 19, 109, 123, 255, 0, 188, 27, 128},
}

6
go.mod
View File

@ -5,9 +5,10 @@ go 1.15
require (
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/ethereum/go-ethereum v1.9.14
github.com/ethereum/go-ethereum v1.10.15
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
@ -26,6 +27,7 @@ require (
github.com/smartystreets/assertions v1.0.0 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0
github.com/vulcanize/go-eth-state-node-iterator v1.0.0
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.11 // indirect
go.uber.org/multierr v1.7.0 // indirect
@ -40,4 +42,4 @@ require (
lukechampine.com/blake3 v1.1.7 // indirect
)
replace github.com/ethereum/go-ethereum v1.9.14 => github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29
replace github.com/ethereum/go-ethereum v1.10.15 => github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0

9
go.sum
View File

@ -213,6 +213,8 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -671,8 +673,10 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 h1:kjZjteD/6vh9DcixPkrg27XtxKW7ZoV5++1EYAi6FAw=
github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E=
github.com/vulcanize/go-eth-state-node-iterator v1.0.0 h1:FQ4s0K5TnRD44p3vO4uQtjfE8C1Gr2EDMT2vVGS3Lu8=
github.com/vulcanize/go-eth-state-node-iterator v1.0.0/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo=
github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0 h1:/BiYPUHnubh46YVtASGt4MPlFR96Rc+iJuTyOI8KZa4=
github.com/vulcanize/go-ethereum v1.10.15-statediff-2.0.0/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@ -926,6 +930,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.8 h1:P1HhGGuLW4aAclzjtmJdf0mJOjVUZUzOTqkAkWL+l6w=

View File

@ -30,37 +30,48 @@ const (
LVL_DB_PATH = "LVL_DB_PATH"
)
// Config is config parameters for DB.
type Config struct {
// DBConfig is config parameters for DB.
type DBConfig struct {
Node ethNode.Info
URI string
ConnConfig postgres.ConnectionConfig
}
// EthConfig is config parameters for the chain.
type EthConfig struct {
LevelDBPath string
AncientDBPath string
Node ethNode.Info
connectionURI string
DBConfig postgres.ConnectionConfig
}
type Config struct {
DB *DBConfig
Eth *EthConfig
}
// Init Initialises config
func (c *Config) Init() {
c.dbInit()
viper.BindEnv("leveldb.path", LVL_DB_PATH)
c.DB.dbInit()
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH)
c.Node = ethNode.Info{
c.DB.Node = ethNode.Info{
ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"),
}
c.LevelDBPath = viper.GetString("leveldb.path")
c.AncientDBPath = viper.GetString("leveldb.ancient")
viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH)
viper.BindEnv("leveldb.path", LVL_DB_PATH)
c.Eth.AncientDBPath = viper.GetString("leveldb.ancient")
c.Eth.LevelDBPath = viper.GetString("leveldb.path")
}
func (c *Config) dbInit() {
func (c *DBConfig) dbInit() {
viper.BindEnv("database.name", postgres.DATABASE_NAME)
viper.BindEnv("database.hostname", postgres.DATABASE_HOSTNAME)
viper.BindEnv("database.port", postgres.DATABASE_PORT)
@ -78,9 +89,9 @@ func (c *Config) dbInit() {
dbParams.User = viper.GetString("database.user")
dbParams.Password = viper.GetString("database.password")
c.connectionURI = postgres.DbConnectionString(dbParams)
// DB config
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
c.URI = postgres.DbConnectionString(dbParams)
// Connection config
c.ConnConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.ConnConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.ConnConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}

View File

@ -0,0 +1,64 @@
package mock
import (
"fmt"
"strings"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx"
"github.com/golang/mock/gomock"
mocks "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
)
type MockPublisher struct {
*mocks.MockPublisher
}
func NewMockPublisher(t *testing.T) *MockPublisher {
ctl := gomock.NewController(t)
return &MockPublisher{mocks.NewMockPublisher(ctl)}
}
func dump(funcname string, xs ...interface{}) {
if true {
return
}
fmt.Printf(">> %s", funcname)
fmt.Printf(strings.Repeat(" %+v", len(xs))+"\n", xs...)
}
func (p *MockPublisher) PublishHeader(header *types.Header) (int64, error) {
// fmt.Printf("PublishHeader %+v\n", header)
dump("PublishHeader", header)
return p.MockPublisher.PublishHeader(header)
}
func (p *MockPublisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) {
dump("PublishStateNode", node, headerID)
return p.MockPublisher.PublishStateNode(node, headerID, tx)
}
func (p *MockPublisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error {
dump("PublishStorageNode", node, stateID)
return p.MockPublisher.PublishStorageNode(node, stateID, tx)
}
func (p *MockPublisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
dump("PublishCode", codeHash, codeBytes)
return p.MockPublisher.PublishCode(codeHash, codeBytes, tx)
}
func (p *MockPublisher) BeginTx() (*sqlx.Tx, error) {
dump("BeginTx")
return p.MockPublisher.BeginTx()
}
func (p *MockPublisher) CommitTx(tx *sqlx.Tx) error {
dump("CommitTx", tx)
return p.MockPublisher.CommitTx(tx)
}
func (p *MockPublisher) PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error) {
dump("PrepareTxForBatch", tx, batchSize)
return p.MockPublisher.PrepareTxForBatch(tx, batchSize)
}

23
pkg/snapshot/mock/util.go Normal file
View File

@ -0,0 +1,23 @@
package mock
import "fmt"
import "github.com/golang/mock/gomock"
type anyOfMatcher struct {
values []interface{}
}
func (m anyOfMatcher) Matches(x interface{}) bool {
for _, v := range m.values {
if gomock.Eq(v).Matches(x) {
return true
}
}
return false
}
func (m anyOfMatcher) String() string {
return fmt.Sprintf("is equal to any of %+v", m.values)
}
func AnyOf(xs ...interface{}) anyOfMatcher {
return anyOfMatcher{xs}
}

View File

@ -30,12 +30,13 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
)
const logInterval = 1 * time.Minute
// Publisher is wrapper around DB.
type Publisher struct {
type publisher struct {
db *postgres.DB
currBatchSize uint
stateNodeCounter uint64
@ -45,16 +46,31 @@ type Publisher struct {
}
// NewPublisher creates Publisher
func NewPublisher(db *postgres.DB) *Publisher {
return &Publisher{
func NewPublisher(db *postgres.DB) *publisher {
return &publisher{
db: db,
currBatchSize: 0,
startTime: time.Now(),
}
}
func (p *publisher) BeginTx() (*sqlx.Tx, error) {
tx, err := p.db.Beginx()
if err != nil {
return nil, err
}
go p.logNodeCounters()
return tx, nil
}
func (p *publisher) CommitTx(tx *sqlx.Tx) error {
logrus.Info("----- final counts -----")
p.printNodeCounters()
return tx.Commit()
}
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table
func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
func (p *publisher) PublishHeader(header *types.Header) (int64, error) {
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return 0, err
@ -93,14 +109,14 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
}
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (int64, error) {
func (p *publisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) {
var stateID int64
var stateKey string
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
stateKey = node.key.Hex()
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
stateKey = node.Key.Hex()
}
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.value)
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return 0, err
}
@ -108,7 +124,7 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`,
headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID)
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID)
// increment state node counter.
atomic.AddUint64(&p.stateNodeCounter, 1)
@ -119,20 +135,20 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i
}
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) error {
func (p *publisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error {
var storageKey string
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
storageKey = node.key.Hex()
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
storageKey = node.Key.Hex()
}
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.value)
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return err
}
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey)
stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil {
return err
}
@ -146,7 +162,7 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e
}
// PublishCode writes code to the ipfs backing pg datastore
func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
@ -164,7 +180,7 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx
return nil
}
func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
func (p *publisher) PrepareTxForBatch(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
var err error
// maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize {
@ -184,14 +200,14 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er
}
// logNodeCounters periodically logs the number of node processed.
func (p *Publisher) logNodeCounters() {
func (p *publisher) logNodeCounters() {
t := time.NewTicker(logInterval)
for range t.C {
p.printNodeCounters()
}
}
func (p *Publisher) printNodeCounters() {
func (p *publisher) printNodeCounters() {
logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String())
logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter))
logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter))

View File

@ -19,6 +19,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -32,6 +33,9 @@ import (
"github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
. "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
iter "github.com/vulcanize/go-eth-state-node-iterator"
)
var (
@ -48,166 +52,162 @@ var (
type Service struct {
ethDB ethdb.Database
stateDB state.Database
ipfsPublisher *Publisher
ipfsPublisher Publisher
maxBatchSize uint
}
func NewPostgresDB(con *DBConfig) (*postgres.DB, error) {
pgDB, err := postgres.NewDB(con.URI, con.ConnConfig, con.Node)
if err != nil {
return nil, err
}
return pgDB, nil
}
func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
return rawdb.NewLevelDBDatabaseWithFreezer(
con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false,
)
}
// NewSnapshotService creates Service.
func NewSnapshotService(con *Config) (*Service, error) {
pgDB, err := postgres.NewDB(con.connectionURI, con.DBConfig, con.Node)
if err != nil {
return nil, err
}
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false)
if err != nil {
return nil, err
}
func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) {
return &Service{
ethDB: edb,
stateDB: state.NewDatabase(edb),
ipfsPublisher: NewPublisher(pgDB),
ipfsPublisher: pub,
maxBatchSize: defaultBatchSize,
}, nil
}
// CreateLatestSnapshot creates snapshot for the latest block.
func (s *Service) CreateLatestSnapshot() error {
type SnapshotParams struct {
Height uint64
Workers uint
}
func (s *Service) CreateSnapshot(params SnapshotParams) error {
// extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header
logrus.Info("Creating snapshot at head")
logrus.Infof("Creating snapshot at height %d", params.Height)
hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height)
header := rawdb.ReadHeader(s.ethDB, hash, params.Height)
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", params.Height)
}
logrus.Infof("head hash: %s head height: %d", hash.Hex(), params.Height)
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
if params.Workers > 0 {
return s.createSnapshotAsync(t, headerID, params.Workers)
} else {
return s.createSnapshot(t.NodeIterator(nil), headerID)
}
return nil
}
// Create snapshot up to head (ignores height param)
func (s *Service) CreateLatestSnapshot(workers uint) error {
logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String())
}
header := rawdb.ReadHeader(s.ethDB, hash, *height)
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height)
}
logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height)
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers})
}
// CreateSnapshot creates snapshot for given block height.
func (s *Service) CreateSnapshot(height uint64) error {
// extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header
logrus.Infof("Creating snapshot at height %d", height)
hash := rawdb.ReadCanonicalHash(s.ethDB, height)
header := rawdb.ReadHeader(s.ethDB, hash, height)
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height)
}
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
}
t, err := s.stateDB.OpenTrie(header.Root)
if err != nil {
return err
}
trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
type nodeResult struct {
node Node
elements []interface{}
}
func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error {
tx, err := s.ipfsPublisher.db.Beginx()
func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
// "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
if it.Leaf() {
return nil, nil
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil, nil
}
path := make([]byte, len(it.Path()))
copy(path, it.Path())
n, err := trieDB.Node(it.Hash())
if err != nil {
return nil, err
}
var elements []interface{}
if err := rlp.DecodeBytes(n, &elements); err != nil {
return nil, err
}
ty, err := CheckKeyType(elements)
if err != nil {
return nil, err
}
return &nodeResult{
node: Node{
NodeType: ty,
Path: path,
Value: n,
},
elements: elements,
}, nil
}
func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
tx, err := s.ipfsPublisher.BeginTx()
if err != nil {
return err
}
go s.ipfsPublisher.logNodeCounters()
defer func() {
logrus.Info("----- final counts -----")
s.ipfsPublisher.printNodeCounters()
if rec := recover(); rec != nil {
shared.Rollback(tx)
panic(rec)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
err = s.ipfsPublisher.CommitTx(tx)
}
}()
for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil {
return err
}
if res == nil {
continue
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil {
return err
}
nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path())
var (
nodeData []byte
ty nodeType
)
nodeData, err = trieDB.Node(it.Hash())
if err != nil {
return err
}
var nodeElements []interface{}
if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil {
return err
}
ty, err = CheckKeyType(nodeElements)
if err != nil {
return err
}
stateNode := &node{
nodeType: ty,
path: nodePath,
value: nodeData,
}
switch ty {
case leaf:
switch res.node.NodeType {
case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
var account types.StateAccount
if err = rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding account for leaf node at path %x nerror: %w", nodePath, err)
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...)
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
stateNode.key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx)
res.node.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx)
if err != nil {
return err
}
@ -229,18 +229,44 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case extension, branch:
stateNode.key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil {
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil {
return err
}
default:
return errors.New("unexpected node type")
}
return nil
}
return it.Error()
}
// Full-trie concurrent snapshot
func (s *Service) createSnapshotAsync(tree state.Trie, headerID int64, workers uint) error {
errors := make(chan error)
var wg sync.WaitGroup
for _, it := range iter.SubtrieIterators(tree, workers) {
wg.Add(1)
go func() {
defer wg.Done()
if err := s.createSnapshot(it, headerID); err != nil {
errors <- err
}
}()
}
go func() {
defer close(errors)
wg.Wait()
}()
select {
case err := <-errors:
return err
}
return nil
}
func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*sqlx.Tx, error) {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return tx, nil
@ -253,63 +279,39 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) {
// skip value nodes
if it.Leaf() {
res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil {
return nil, err
}
if res == nil {
continue
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil {
return nil, err
}
nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path())
var (
nodeData []byte
ty nodeType
)
var nodeData []byte
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
if err != nil {
return nil, err
}
res.node.Value = nodeData
var nodeElements []interface{}
if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil {
return nil, err
}
ty, err = CheckKeyType(nodeElements)
if err != nil {
return nil, err
}
storageNode := &node{
nodeType: ty,
path: nodePath,
value: nodeData,
}
switch ty {
case leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...)
switch res.node.NodeType {
case Leaf:
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
storageNode.key = common.BytesToHash(leafKey)
case extension, branch:
storageNode.key = common.BytesToHash([]byte{})
res.node.Key = common.BytesToHash(leafKey)
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
default:
return nil, errors.New("unexpected node type")
}
if err = s.ipfsPublisher.PublishStorageNode(storageNode, stateID, tx); err != nil {
if err = s.ipfsPublisher.PublishStorageNode(&res.node, stateID, tx); err != nil {
return nil, err
}
}

View File

@ -0,0 +1,99 @@
package snapshot
import (
"fmt"
"os"
"path/filepath"
"testing"
"github.com/golang/mock/gomock"
ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture"
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot/mock"
)
func testConfig(leveldbpath, ancientdbpath string) *Config {
dbParams := postgres.ConnectionParams{
Name: "snapshot_test",
Hostname: "localhost",
Port: 5432,
User: "tester",
Password: "test_pw",
}
connconfig := postgres.ConnectionConfig{
MaxIdle: 0,
MaxLifetime: 0,
MaxOpen: 4,
}
nodeinfo := ethNode.Info{
ID: "eth_node_id",
ClientName: "eth_client",
GenesisBlock: "X",
NetworkID: "eth_network",
ChainID: 0,
}
return &Config{
DB: &DBConfig{
Node: nodeinfo,
URI: postgres.DbConnectionString(dbParams),
ConnConfig: connconfig,
},
Eth: &EthConfig{
LevelDBPath: leveldbpath,
AncientDBPath: ancientdbpath,
},
}
}
func TestCreateSnapshot(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
panic(err)
}
fixture_path := filepath.Join(wd, "..", "..", "fixture")
datadir := filepath.Join(fixture_path, "chaindata")
if _, err := os.Stat(datadir); err != nil {
t.Fatal("no chaindata:", err)
}
config := testConfig(datadir, filepath.Join(datadir, "ancient"))
fmt.Printf("config: %+v %+v\n", config.DB, config.Eth)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
workers := 8
pub := mock.NewMockPublisher(t)
pub.EXPECT().PublishHeader(gomock.Eq(fixt.PublishHeader))
pub.EXPECT().BeginTx().
Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).
Times(workers)
pub.EXPECT().PublishStateNode(gomock.Any(), mock.AnyOf(int64(0), int64(1)), gomock.Any()).
Times(workers)
// TODO: fixtures for storage node
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
pub.EXPECT().CommitTx(gomock.Any()).
Times(workers)
service, err := NewSnapshotService(edb, pub)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: 1, Workers: uint(workers)}
err = service.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
// err = service.CreateLatestSnapshot(0)
// if err != nil {
// t.Fatal(err)
// }
}

View File

@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package snapshot
package types
import (
"fmt"
@ -23,41 +23,42 @@ import (
// node for holding trie node information
type node struct {
nodeType nodeType
path []byte
key common.Hash
value []byte
NodeType nodeType
Path []byte
Key common.Hash
Value []byte
}
type Node = node
// nodeType for explicitly setting type of node
type nodeType int
const (
branch nodeType = iota
extension
leaf
removed
unknown
Branch nodeType = iota
Extension
Leaf
Removed
Unknown
)
// CheckKeyType checks what type of key we have
func CheckKeyType(elements []interface{}) (nodeType, error) {
if len(elements) > 2 {
return branch, nil
return Branch, nil
}
if len(elements) < 2 {
return unknown, fmt.Errorf("node cannot be less than two elements in length")
return Unknown, fmt.Errorf("node cannot be less than two elements in length")
}
switch elements[0].([]byte)[0] / 16 {
case '\x00':
return extension, nil
return Extension, nil
case '\x01':
return extension, nil
return Extension, nil
case '\x02':
return leaf, nil
return Leaf, nil
case '\x03':
return leaf, nil
return Leaf, nil
default:
return unknown, fmt.Errorf("unknown hex prefix")
return Unknown, fmt.Errorf("unknown hex prefix")
}
}

17
pkg/types/publisher.go Normal file
View File

@ -0,0 +1,17 @@
package types
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx"
)
type Publisher interface {
PublishHeader(header *types.Header) (int64, error)
PublishStateNode(node *Node, headerID int64, tx *sqlx.Tx) (int64, error)
PublishStorageNode(node *Node, stateID int64, tx *sqlx.Tx) error
PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error
BeginTx() (*sqlx.Tx, error)
CommitTx(*sqlx.Tx) error
PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error)
}