Update geth and implement transaction batching.

This commit is contained in:
Arijit Das 2021-12-13 20:31:32 +05:30
parent cace2383ef
commit ba8e32ce97
6 changed files with 790 additions and 956 deletions

View File

@ -1,12 +1,13 @@
[database] [database]
name = "vulcanize_public" name = "vulcanize_testing"
hostname = "localhost" hostname = "localhost"
port = 5432 port = 8077
user = "postgres" user = "vdbm"
password = "password"
[leveldb] [leveldb]
path = "/Users/user/Library/Ethereum/geth/chaindata" path = "/Users/arijitdas/Library/Ethereum/rinkeby/geth/chaindata"
ancient = "/Users/user/Library/Ethereum/geth/chaindata/ancient" ancient = "/Users/arijitdas/Library/Ethereum/rinkeby/geth/chaindata/ancient"
[snapshot] [snapshot]
blockHeight = -1 blockHeight = 58176

40
go.mod
View File

@ -1,16 +1,44 @@
module github.com/vulcanize/eth-pg-ipfs-state-snapshot module github.com/vulcanize/eth-pg-ipfs-state-snapshot
go 1.13 go 1.15
require ( require (
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/ethereum/go-ethereum v1.9.11 github.com/ethereum/go-ethereum v1.9.11
github.com/ipfs/go-ipfs-blockstore v1.0.0 github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/go-kit/kit v0.10.0 // indirect
github.com/multiformats/go-multihash v0.0.13 github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-datastore v0.5.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.4.0 // indirect
github.com/jmoiron/sqlx v1.2.0
github.com/kr/pretty v0.3.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.13.0 // indirect
github.com/sirupsen/logrus v1.6.0 github.com/sirupsen/logrus v1.6.0
github.com/smartystreets/assertions v1.0.0 // indirect
github.com/spf13/cobra v1.0.0 github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0 github.com/spf13/viper v1.7.0
github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.11 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211209171907-798191bca915 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.27.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
) )
replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27

1384
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -16,43 +16,69 @@
package snapshot package snapshot
import ( import (
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
) )
const ( const (
ANCIENT_DB_PATH = "ANCIENT_DB_PATH" ancientDbPath = "ANCIENT_DB_PATH"
ETH_CLIENT_NAME = "ETH_CLIENT_NAME" ethClientName = "ETH_CLIENT_NAME"
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" ethGenesisBlock = "ETH_GENESIS_BLOCK"
ETH_NETWORK_ID = "ETH_NETWORK_ID" ethNetworkId = "ETH_NETWORK_ID"
ETH_NODE_ID = "ETH_NODE_ID" ethNodeId = "ETH_NODE_ID"
LVL_DB_PATH = "LVL_DB_PATH" lvlDbPath = "LVL_DB_PATH"
) )
type Config struct { type Config struct {
LevelDBPath string LevelDBPath string
AncientDBPath string AncientDBPath string
Node core.Node Node node.Info
DBConfig config.Database connectionURI string
DBConfig postgres.ConnectionConfig
} }
func (c *Config) Init() { func (c *Config) Init() {
c.DBConfig.Init() c.dbInit()
viper.BindEnv("leveldb.path", LVL_DB_PATH) viper.BindEnv("leveldb.path", lvlDbPath)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) viper.BindEnv("ethereum.nodeID", ethNodeId)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) viper.BindEnv("ethereum.clientName", ethClientName)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) viper.BindEnv("ethereum.genesisBlock", ethGenesisBlock)
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) viper.BindEnv("ethereum.networkID", ethNetworkId)
viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH) viper.BindEnv("leveldb.ancient", ancientDbPath)
c.Node = core.Node{ c.Node = node.Info{
ID: viper.GetString("ethereum.nodeID"), ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"), ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"), GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"), NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"),
} }
c.LevelDBPath = viper.GetString("leveldb.path") c.LevelDBPath = viper.GetString("leveldb.path")
c.AncientDBPath = viper.GetString("leveldb.ancient") c.AncientDBPath = viper.GetString("leveldb.ancient")
} }
func (c *Config) dbInit() {
viper.BindEnv("database.name", postgres.DATABASE_NAME)
viper.BindEnv("database.hostname", postgres.DATABASE_HOSTNAME)
viper.BindEnv("database.port", postgres.DATABASE_PORT)
viper.BindEnv("database.user", postgres.DATABASE_USER)
viper.BindEnv("database.password", postgres.DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", postgres.DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", postgres.DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", postgres.DATABASE_MAX_CONN_LIFETIME)
dbParams := postgres.ConnectionParams{}
// DB params
dbParams.Name = viper.GetString("database.name")
dbParams.Hostname = viper.GetString("database.hostname")
dbParams.Port = viper.GetInt("database.port")
dbParams.User = viper.GetString("database.user")
dbParams.Password = viper.GetString("database.password")
c.connectionURI = postgres.DbConnectionString(dbParams)
// DB config
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}

View File

@ -19,22 +19,25 @@ import (
"bytes" "bytes"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
) )
type Publisher struct { type Publisher struct {
db *postgres.DB db *postgres.DB
currBatchSize uint
} }
func NewPublisher(db *postgres.DB) *Publisher { func NewPublisher(db *postgres.DB) *Publisher {
return &Publisher{ return &Publisher{
db: db, db: db,
currBatchSize: 0,
} }
} }
@ -44,10 +47,12 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
tx, err := p.db.Beginx() tx, err := p.db.Beginx()
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
shared.Rollback(tx) shared.Rollback(tx)
@ -58,9 +63,11 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
err = tx.Commit() err = tx.Commit()
} }
}() }()
if err := shared.PublishIPLD(tx, headerNode); err != nil {
if err = shared.PublishIPLD(tx, headerNode); err != nil {
return 0, err return 0, err
} }
mhKey, _ := shared.MultihashKeyFromCIDString(headerNode.Cid().String()) mhKey, _ := shared.MultihashKeyFromCIDString(headerNode.Cid().String())
var headerID int64 var headerID int64
err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated) err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated)
@ -69,86 +76,107 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
RETURNING id`, RETURNING id`,
header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(), header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(),
header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, 0).Scan(&headerID) header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, 0).Scan(&headerID)
return headerID, err return headerID, err
} }
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *Publisher) PublishStateNode(node Node, headerID int64) (int64, error) { func (p *Publisher) PublishStateNode(node Node, headerID int64, tx *sqlx.Tx) (int64, error) {
var stateID int64 var stateID int64
var stateKey string var stateKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
stateKey = node.Key.Hex() stateKey = node.Key.Hex()
} }
tx, err := p.db.Beginx()
if err != nil {
return 0, err
}
defer func() { defer func() {
if p := recover(); p != nil { if rec := recover(); rec != nil {
shared.Rollback(tx) shared.Rollback(tx)
panic(p) panic(rec)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
} }
}() }()
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil { if err != nil {
return 0, err return 0, err
} }
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET state_path = state_cids.state_path ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`, RETURNING id`,
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID) headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID)
p.currBatchSize += 2
return stateID, err return stateID, err
} }
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *Publisher) PublishStorageNode(node Node, stateID int64) error { func (p *Publisher) PublishStorageNode(node Node, stateID int64, tx *sqlx.Tx) error {
var storageKey string var storageKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
storageKey = node.Key.Hex() storageKey = node.Key.Hex()
} }
tx, err := p.db.Beginx()
if err != nil {
return err
}
defer func() { defer func() {
if p := recover(); p != nil { if rec := recover(); rec != nil {
shared.Rollback(tx) shared.Rollback(tx)
panic(p) panic(rec)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
} }
}() }()
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
if err != nil { if err != nil {
return err return err
} }
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO NOTHING`, ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey) stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
return err if err != nil {
return err
}
p.currBatchSize += 2
return nil
} }
// PublishCode writes code to the ipfs backing pg datastore // PublishCode writes code to the ipfs backing pg datastore
func (p *Publisher) PublishCode(code []byte) error { func (p *Publisher) PublishCode(code []byte, tx *sqlx.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived // no codec for code, doesn't matter though since blockstore key is multihash-derived
return p.publishRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, code) return p.publishRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, code, tx)
} }
func (p *Publisher) publishRaw(codec, mh uint64, raw []byte) error { func (p *Publisher) publishRaw(codec, mh uint64, raw []byte, tx *sqlx.Tx) error {
c, err := ipld.RawdataToCid(codec, raw, mh) c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil { if err != nil {
return err return err
} }
dbKey := dshelp.MultihashToDsKey(c.Hash()) dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err = p.db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw) _, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return err if err != nil {
return err
}
p.currBatchSize++
return nil
}
func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
var err error
// maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize {
if err = tx.Commit(); err != nil {
return nil, err
}
tx, err = p.db.Beginx()
if err != nil {
return nil, err
}
p.currBatchSize = 0
}
return tx, nil
} }

View File

@ -23,13 +23,15 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
) )
var ( var (
@ -37,27 +39,33 @@ var (
emptyNode, _ = rlp.EncodeToBytes([]byte{}) emptyNode, _ = rlp.EncodeToBytes([]byte{})
emptyCodeHash = crypto.Keccak256([]byte{}) emptyCodeHash = crypto.Keccak256([]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
defaultBatchSize = uint(100)
) )
type Service struct { type Service struct {
ethDB ethdb.Database ethDB ethdb.Database
stateDB state.Database stateDB state.Database
ipfsPublisher *Publisher ipfsPublisher *Publisher
maxBatchSize uint
} }
func NewSnapshotService(con Config) (*Service, error) { func NewSnapshotService(con Config) (*Service, error) {
pgdb, err := postgres.NewDB(con.DBConfig, con.Node) pgDB, err := postgres.NewDB(con.connectionURI, con.DBConfig, con.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot")
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Service{ return &Service{
ethDB: edb, ethDB: edb,
stateDB: state.NewDatabase(edb), stateDB: state.NewDatabase(edb),
ipfsPublisher: NewPublisher(pgdb), ipfsPublisher: NewPublisher(pgDB),
maxBatchSize: defaultBatchSize,
}, nil }, nil
} }
@ -65,26 +73,37 @@ func (s *Service) CreateLatestSnapshot() error {
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
logrus.Info("Creating snapshot at head") logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB) hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash) height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil { if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String()) return fmt.Errorf("unable to read header height for header hash %s", hash.String())
} }
header := rawdb.ReadHeader(s.ethDB, hash, *height) header := rawdb.ReadHeader(s.ethDB, hash, *height)
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height) return fmt.Errorf("unable to read canonical header at height %d", height)
} }
logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height) logrus.Infof("head hash: %s head height: %d", hash.Hex(), *height)
headerID, err := s.ipfsPublisher.PublishHeader(header) headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil { if err != nil {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) t, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
trieDB := s.stateDB.TrieDB() trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) err = s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
if err != nil {
return err
}
return nil
} }
func (s *Service) CreateSnapshot(height uint64) error { func (s *Service) CreateSnapshot(height uint64) error {
@ -96,77 +115,118 @@ func (s *Service) CreateSnapshot(height uint64) error {
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", height) return fmt.Errorf("unable to read canonical header at height %d", height)
} }
headerID, err := s.ipfsPublisher.PublishHeader(header) headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil { if err != nil {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) t, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
trieDB := s.stateDB.TrieDB() trieDB := s.stateDB.TrieDB()
return s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID) err = s.createSnapshot(t.NodeIterator([]byte{}), trieDB, headerID)
if err != nil {
return err
}
return nil
} }
func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error { func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, headerID int64) error {
tx, err := s.ipfsPublisher.db.Beginx()
if err != nil {
return err
}
defer func() {
err = tx.Commit()
}()
for it.Next(true) { for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
continue continue
} }
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue continue
} }
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return err
}
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := trieDB.Node(it.Hash())
var (
node []byte
ty NodeType
)
node, err = trieDB.Node(it.Hash())
if err != nil { if err != nil {
return err return err
} }
var nodeElements []interface{} var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err = rlp.DecodeBytes(node, &nodeElements); err != nil {
return err return err
} }
ty, err := CheckKeyType(nodeElements)
ty, err = CheckKeyType(nodeElements)
if err != nil { if err != nil {
return err return err
} }
stateNode := Node{ stateNode := Node{
NodeType: ty, NodeType: ty,
Path: nodePath, Path: nodePath,
Value: node, Value: node,
} }
switch ty { switch ty {
case Leaf: case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
var account state.Account var account types.StateAccount
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err) return fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err)
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...) valueNodePath := append(nodePath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
stateNode.Key = common.BytesToHash(leafKey) stateNode.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx)
if err != nil { if err != nil {
return err return err
} }
// publish any non-nil code referenced by codehash // publish any non-nil code referenced by codehash
if !bytes.Equal(account.CodeHash, emptyCodeHash) { if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeBytes, err := s.ethDB.Get(account.CodeHash) key := common.BytesToHash(account.CodeHash)
if err != nil { codeBytes := rawdb.ReadCode(s.ethDB, key)
return err if len(codeBytes) == 0 {
logrus.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return errors.New("missing code")
} }
if err := s.ipfsPublisher.PublishCode(codeBytes); err != nil {
if err := s.ipfsPublisher.PublishCode(codeBytes, tx); err != nil {
return err return err
} }
} }
if err := s.storageSnapshot(account.Root, stateID); err != nil {
if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err) return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err)
} }
case Extension, Branch: case Extension, Branch:
stateNode.Key = common.BytesToHash([]byte{}) stateNode.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil {
return err return err
} }
default: default:
@ -176,42 +236,55 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
return it.Error() return it.Error()
} }
func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error { func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*sqlx.Tx, error) {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil return tx, nil
} }
sTrie, err := s.stateDB.OpenTrie(sr) sTrie, err := s.stateDB.OpenTrie(sr)
if err != nil { if err != nil {
return err return nil, err
} }
it := sTrie.NodeIterator(make([]byte, 0)) it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) { for it.Next(true) {
// skip value nodes // skip value nodes
if it.Leaf() { if it.Leaf() {
continue continue
} }
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) { if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue continue
} }
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return nil, err
}
nodePath := make([]byte, len(it.Path())) nodePath := make([]byte, len(it.Path()))
copy(nodePath, it.Path()) copy(nodePath, it.Path())
node, err := s.stateDB.TrieDB().Node(it.Hash()) node, err := s.stateDB.TrieDB().Node(it.Hash())
if err != nil { if err != nil {
return err return nil, err
} }
var nodeElements []interface{} var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil { if err := rlp.DecodeBytes(node, &nodeElements); err != nil {
return err return nil, err
} }
ty, err := CheckKeyType(nodeElements) ty, err := CheckKeyType(nodeElements)
if err != nil { if err != nil {
return err return nil, err
} }
storageNode := Node{ storageNode := Node{
NodeType: ty, NodeType: ty,
Path: nodePath, Path: nodePath,
Value: node, Value: node,
} }
switch ty { switch ty {
case Leaf: case Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
@ -222,11 +295,13 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error {
case Extension, Branch: case Extension, Branch:
storageNode.Key = common.BytesToHash([]byte{}) storageNode.Key = common.BytesToHash([]byte{})
default: default:
return errors.New("unexpected node type") return nil, errors.New("unexpected node type")
} }
if err := s.ipfsPublisher.PublishStorageNode(storageNode, stateID); err != nil {
return err if err = s.ipfsPublisher.PublishStorageNode(storageNode, stateID, tx); err != nil {
return nil, err
} }
} }
return it.Error()
return tx, it.Error()
} }