Merge pull request #11 from vulcanize/update-geth-impl-tx-batching

Update geth and implement transaction batching.
This commit is contained in:
Arijit Das 2021-12-20 19:24:09 +05:30 committed by GitHub
commit 48636d2f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 827 additions and 1001 deletions

View File

@ -37,6 +37,7 @@ var rootCmd = &cobra.Command{
PersistentPreRun: initFuncs, PersistentPreRun: initFuncs,
} }
// Execute executes root Command.
func Execute() { func Execute() {
log.Info("----- Starting vDB -----") log.Info("----- Starting vDB -----")
if err := rootCmd.Execute(); err != nil { if err := rootCmd.Execute(); err != nil {

View File

@ -19,6 +19,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot" "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot"
) )
@ -37,7 +38,7 @@ var stateSnapshotCmd = &cobra.Command{
} }
func stateSnapshot() { func stateSnapshot() {
snapConfig := snapshot.Config{} snapConfig := &snapshot.Config{}
snapConfig.Init() snapConfig.Init()
snapshotService, err := snapshot.NewSnapshotService(snapConfig) snapshotService, err := snapshot.NewSnapshotService(snapConfig)
if err != nil { if err != nil {

40
go.mod
View File

@ -1,16 +1,44 @@
module github.com/vulcanize/eth-pg-ipfs-state-snapshot module github.com/vulcanize/eth-pg-ipfs-state-snapshot
go 1.13 go 1.15
require ( require (
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/ethereum/go-ethereum v1.9.11 github.com/ethereum/go-ethereum v1.9.11
github.com/ipfs/go-ipfs-blockstore v1.0.0 github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/go-kit/kit v0.10.0 // indirect
github.com/multiformats/go-multihash v0.0.13 github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-datastore v0.5.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.4.0 // indirect
github.com/jmoiron/sqlx v1.2.0
github.com/kr/pretty v0.3.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.13.0 // indirect
github.com/sirupsen/logrus v1.6.0 github.com/sirupsen/logrus v1.6.0
github.com/smartystreets/assertions v1.0.0 // indirect
github.com/spf13/cobra v1.0.0 github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0 github.com/spf13/viper v1.7.0
github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.11 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211209171907-798191bca915 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.27.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
) )
replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27

1384
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -16,10 +16,9 @@
package snapshot package snapshot
import ( import (
ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
) )
const ( const (
@ -31,15 +30,18 @@ const (
LVL_DB_PATH = "LVL_DB_PATH" LVL_DB_PATH = "LVL_DB_PATH"
) )
// Config is config parameters for DB.
type Config struct { type Config struct {
LevelDBPath string LevelDBPath string
AncientDBPath string AncientDBPath string
Node core.Node Node ethNode.Info
DBConfig config.Database connectionURI string
DBConfig postgres.ConnectionConfig
} }
// Init Initialises config
func (c *Config) Init() { func (c *Config) Init() {
c.DBConfig.Init() c.dbInit()
viper.BindEnv("leveldb.path", LVL_DB_PATH) viper.BindEnv("leveldb.path", LVL_DB_PATH)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
@ -47,12 +49,38 @@ func (c *Config) Init() {
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH) viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH)
c.Node = core.Node{ c.Node = ethNode.Info{
ID: viper.GetString("ethereum.nodeID"), ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"), ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"), GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"), NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"),
} }
c.LevelDBPath = viper.GetString("leveldb.path") c.LevelDBPath = viper.GetString("leveldb.path")
c.AncientDBPath = viper.GetString("leveldb.ancient") c.AncientDBPath = viper.GetString("leveldb.ancient")
} }
func (c *Config) dbInit() {
viper.BindEnv("database.name", postgres.DATABASE_NAME)
viper.BindEnv("database.hostname", postgres.DATABASE_HOSTNAME)
viper.BindEnv("database.port", postgres.DATABASE_PORT)
viper.BindEnv("database.user", postgres.DATABASE_USER)
viper.BindEnv("database.password", postgres.DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", postgres.DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", postgres.DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", postgres.DATABASE_MAX_CONN_LIFETIME)
dbParams := postgres.ConnectionParams{}
// DB params
dbParams.Name = viper.GetString("database.name")
dbParams.Hostname = viper.GetString("database.hostname")
dbParams.Port = viper.GetInt("database.port")
dbParams.User = viper.GetString("database.user")
dbParams.Password = viper.GetString("database.password")
c.connectionURI = postgres.DbConnectionString(dbParams)
// DB config
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}

View File

@ -21,43 +21,43 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// Node for holding trie node information // node for holding trie node information
type Node struct { type node struct {
NodeType NodeType nodeType nodeType
Path []byte path []byte
Key common.Hash key common.Hash
Value []byte value []byte
} }
// NodeType for explicitly setting type of node // nodeType for explicitly setting type of node
type NodeType int type nodeType int
const ( const (
Branch NodeType = iota branch nodeType = iota
Extension extension
Leaf leaf
Removed removed
Unknown unknown
) )
// CheckKeyType checks what type of key we have // CheckKeyType checks what type of key we have
func CheckKeyType(elements []interface{}) (NodeType, error) { func CheckKeyType(elements []interface{}) (nodeType, error) {
if len(elements) > 2 { if len(elements) > 2 {
return Branch, nil return branch, nil
} }
if len(elements) < 2 { if len(elements) < 2 {
return Unknown, fmt.Errorf("node cannot be less than two elements in length") return unknown, fmt.Errorf("node cannot be less than two elements in length")
} }
switch elements[0].([]byte)[0] / 16 { switch elements[0].([]byte)[0] / 16 {
case '\x00': case '\x00':
return Extension, nil return extension, nil
case '\x01': case '\x01':
return Extension, nil return extension, nil
case '\x02': case '\x02':
return Leaf, nil return leaf, nil
case '\x03': case '\x03':
return Leaf, nil return leaf, nil
default: default:
return Unknown, fmt.Errorf("unknown hex prefix") return unknown, fmt.Errorf("unknown hex prefix")
} }
} }

View File

@ -17,24 +17,29 @@ package snapshot
import ( import (
"bytes" "bytes"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-ipfs-blockstore" "github.com/jmoiron/sqlx"
"github.com/ipfs/go-ipfs-ds-help"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
) )
// Publisher is wrapper around DB.
type Publisher struct { type Publisher struct {
db *postgres.DB db *postgres.DB
currBatchSize uint
} }
// NewPublisher creates Publisher
func NewPublisher(db *postgres.DB) *Publisher { func NewPublisher(db *postgres.DB) *Publisher {
return &Publisher{ return &Publisher{
db: db, db: db,
currBatchSize: 0,
} }
} }
@ -44,10 +49,12 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
tx, err := p.db.Beginx() tx, err := p.db.Beginx()
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
shared.Rollback(tx) shared.Rollback(tx)
@ -58,9 +65,11 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
err = tx.Commit() err = tx.Commit()
} }
}() }()
if err := shared.PublishIPLD(tx, headerNode); err != nil {
if err = shared.PublishIPLD(tx, headerNode); err != nil {
return 0, err return 0, err
} }
mhKey, _ := shared.MultihashKeyFromCIDString(headerNode.Cid().String()) mhKey, _ := shared.MultihashKeyFromCIDString(headerNode.Cid().String())
var headerID int64 var headerID int64
err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated) err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated)
@ -69,86 +78,86 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
RETURNING id`, RETURNING id`,
header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(), header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(),
header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, 0).Scan(&headerID) header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, 0).Scan(&headerID)
return headerID, err return headerID, err
} }
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *Publisher) PublishStateNode(node Node, headerID int64) (int64, error) { func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (int64, error) {
var stateID int64 var stateID int64
var stateKey string var stateKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
stateKey = node.Key.Hex() stateKey = node.key.Hex()
} }
tx, err := p.db.Beginx()
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.value)
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return 0, err
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET state_path = state_cids.state_path ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`, RETURNING id`,
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID) headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID)
p.currBatchSize += 2
return stateID, err return stateID, err
} }
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *Publisher) PublishStorageNode(node Node, stateID int64) error { func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) error {
var storageKey string var storageKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
storageKey = node.Key.Hex() storageKey = node.key.Hex()
} }
tx, err := p.db.Beginx()
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.value)
if err != nil { if err != nil {
return err return err
} }
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO NOTHING`, ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey)
if err != nil {
return err return err
} }
p.currBatchSize += 2
return nil
}
// PublishCode writes code to the ipfs backing pg datastore // PublishCode writes code to the ipfs backing pg datastore
func (p *Publisher) PublishCode(code []byte) error { func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived // no codec for code, doesn't matter though since blockstore key is multihash-derived
return p.publishRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, code) mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
} }
func (p *Publisher) publishRaw(codec, mh uint64, raw []byte) error { if err = shared.PublishDirect(tx, mhKey, codeBytes); err != nil {
c, err := ipld.RawdataToCid(codec, raw, mh) return fmt.Errorf("error publishing code IPLD: %v", err)
}
p.currBatchSize++
return nil
}
func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
var err error
// maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize {
if err = tx.Commit(); err != nil {
return nil, err
}
tx, err = p.db.Beginx()
if err != nil { if err != nil {
return err return nil, err
} }
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() p.currBatchSize = 0
_, err = p.db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw) }
return err
return tx, nil
} }

View File

@ -23,13 +23,15 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
) )
var ( var (
@ -37,56 +39,73 @@ var (
emptyNode, _ = rlp.EncodeToBytes([]byte{}) emptyNode, _ = rlp.EncodeToBytes([]byte{})
emptyCodeHash = crypto.Keccak256([]byte{}) emptyCodeHash = crypto.Keccak256([]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
defaultBatchSize = uint(100)
) )
// Service holds ethDB and stateDB to read data from lvldb and Publisher
// to publish trie in postgres DB.
type Service struct { type Service struct {
ethDB ethdb.Database ethDB ethdb.Database
stateDB state.Database stateDB state.Database
ipfsPublisher *Publisher ipfsPublisher *Publisher
maxBatchSize uint
} }
func NewSnapshotService(con Config) (*Service, error) { // NewSnapshotService creates Service.
pgdb, err := postgres.NewDB(con.DBConfig, con.Node) func NewSnapshotService(con *Config) (*Service, error) {
pgDB, err := postgres.NewDB(con.connectionURI, con.DBConfig, con.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot")
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Service{ return &Service{
ethDB: edb, ethDB: edb,
stateDB: state.NewDatabase(edb), stateDB: state.NewDatabase(edb),
ipfsPublisher: NewPublisher(pgdb), ipfsPublisher: NewPublisher(pgDB),
maxBatchSize: defaultBatchSize,
}, nil }, nil
} }
// CreateLatestSnapshot creates snapshot for the latest block.
func (s *Service) CreateLatestSnapshot() error { func (s *Service) CreateLatestSnapshot() error {
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
logrus.Info("Creating snapshot at head") logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB) hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash) height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil { if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String()) return fmt.Errorf("unable to read header height for header hash %s", hash.String())
} }
header := rawdb.ReadHeader(s.ethDB, hash, *height) header := rawdb.ReadHeader(s.ethDB, hash, *height)
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height) return fmt.Errorf("unable to read canonical header at height %d", height)
} }
logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height) logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height)
headerID, err := s.ipfsPublisher.PublishHeader(header) headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil { if err != nil {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) t, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
trieDB := s.stateDB.TrieDB() trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
} }
// CreateSnapshot creates snapshot for given block height.
func (s *Service) CreateSnapshot(height uint64) error { func (s *Service) CreateSnapshot(height uint64) error {
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
@ -96,77 +115,120 @@ func (s *Service) CreateSnapshot(height uint64) error {
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height) return fmt.Errorf("unable to read canonical header at height %d", height)
} }
headerID, err := s.ipfsPublisher.PublishHeader(header) headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil { if err != nil {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) t, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
trieDB := s.stateDB.TrieDB() trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
} }
func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error {
tx, err := s.ipfsPublisher.db.Beginx()
if err != nil {
return err
}
defer func() {
if rec := recover(); rec != nil {
shared.Rollback(tx)
panic(rec)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
for it.Next(true) { for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
continue continue
} }
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue continue
} }
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return err
}
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := trieDB.Node(it.Hash())
var (
nodeData []byte
ty nodeType
)
nodeData, err = trieDB.Node(it.Hash())
if err != nil { if err != nil {
return err return err
} }
var nodeElements []interface{} var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil {
return err return err
} }
ty, err := CheckKeyType(nodeElements)
ty, err = CheckKeyType(nodeElements)
if err != nil { if err != nil {
return err return err
} }
stateNode := Node{
NodeType: ty, stateNode := &node{
Path: nodePath, nodeType: ty,
Value: node, path: nodePath,
value: nodeData,
} }
switch ty { switch ty {
case Leaf: case leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
var account state.Account var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { if err = rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err) return fmt.Errorf("error decoding account for leaf node at path %x nerror: %w", nodePath, err)
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(nodePath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
stateNode.Key = common.BytesToHash(leafKey) stateNode.key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx)
if err != nil { if err != nil {
return err return err
} }
// publish any non-nil code referenced by codehash // publish any non-nil code referenced by codehash
if !bytes.Equal(account.CodeHash, emptyCodeHash) { if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeBytes, err := s.ethDB.Get(account.CodeHash) codeHash := common.BytesToHash(account.CodeHash)
if err != nil { codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
return err if len(codeBytes) == 0 {
logrus.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return errors.New("missing code")
} }
if err := s.ipfsPublisher.PublishCode(codeBytes); err != nil {
if err = s.ipfsPublisher.PublishCode(codeHash, codeBytes, tx); err != nil {
return err return err
} }
} }
if err := s.storageSnapshot(account.Root, stateID); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err) if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
} }
case Extension, Branch: case extension, branch:
stateNode.Key = common.BytesToHash([]byte{}) stateNode.key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil {
return err return err
} }
default: default:
@ -176,57 +238,78 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
return it.Error() return it.Error()
} }
func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*sqlx.Tx, error) {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil return tx, nil
} }
sTrie, err := s.stateDB.OpenTrie(sr) sTrie, err := s.stateDB.OpenTrie(sr)
if err != nil { if err != nil {
return err return nil, err
} }
it := sTrie.NodeIterator(make([]byte, 0)) it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) { for it.Next(true) {
// skip value nodes // skip value nodes
if it.Leaf() { if it.Leaf() {
continue continue
} }
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue continue
} }
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return nil, err
}
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := s.stateDB.TrieDB().Node(it.Hash())
var (
nodeData []byte
ty nodeType
)
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
if err != nil { if err != nil {
return err return nil, err
} }
var nodeElements []interface{} var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err = rlp.DecodeBytes(nodeData, &nodeElements); err != nil {
return err return nil, err
} }
ty, err := CheckKeyType(nodeElements)
ty, err = CheckKeyType(nodeElements)
if err != nil { if err != nil {
return err return nil, err
} }
storageNode := Node{
NodeType: ty, storageNode := &node{
Path: nodePath, nodeType: ty,
Value: node, path: nodePath,
value: nodeData,
} }
switch ty { switch ty {
case Leaf: case leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(nodePath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
storageNode.Key = common.BytesToHash(leafKey) storageNode.key = common.BytesToHash(leafKey)
case Extension, Branch: case extension, branch:
storageNode.Key = common.BytesToHash([]byte{}) storageNode.key = common.BytesToHash([]byte{})
default: default:
return errors.New("unexpected node type") return nil, errors.New("unexpected node type")
} }
if err := s.ipfsPublisher.PublishStorageNode(storageNode, stateID); err != nil {
return err if err = s.ipfsPublisher.PublishStorageNode(storageNode, stateID, tx); err != nil {
return nil, err
} }
} }
return it.Error()
return tx, it.Error()
} }