Merge branch 'master' into async-traversal

This commit is contained in:
Roy Crihfield 2022-01-10 17:47:27 -06:00
commit 317b277939
10 changed files with 544 additions and 1027 deletions

View File

@ -18,11 +18,9 @@ Config format:
user = "postgres"
[leveldb]
path = "/Users/user/Library/Ethereum/geth/chaindata"
# path for geth's "freezer" archive
ancient = "/Users/user/Library/Ethereum/geth/chaindata"
path = "/Users/user/Library/Ethereum/geth/chaindata"
ancient = "/Users/user/Library/Ethereum/geth/chaindata/ancient"
[snapshot]
blockHeight = 0
divideDepth = 1
blockHeight = 0
```

View File

@ -37,6 +37,7 @@ var rootCmd = &cobra.Command{
PersistentPreRun: initFuncs,
}
// Execute executes root Command.
func Execute() {
log.Info("----- Starting vDB -----")
if err := rootCmd.Execute(); err != nil {

View File

@ -19,6 +19,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot"
)
@ -37,9 +38,9 @@ var stateSnapshotCmd = &cobra.Command{
}
func stateSnapshot() {
serviceConfig := snapshot.ServiceConfig{}
serviceConfig.Init()
snapshotService, err := snapshot.NewSnapshotService(serviceConfig)
snapConfig := &snapshot.Config{}
snapConfig.Init()
snapshotService, err := snapshot.NewSnapshotService(snapConfig)
if err != nil {
logWithCommand.Fatal(err)
}

View File

@ -1,8 +1,8 @@
[database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "postgres"
name = "vulcanize_public"
hostname = "localhost"
port = 5432
user = "postgres"
[leveldb]
path = "/Users/user/Library/Ethereum/geth/chaindata"

50
go.mod
View File

@ -1,23 +1,45 @@
module github.com/vulcanize/eth-pg-ipfs-state-snapshot
go 1.13
go 1.15
require (
github.com/dgraph-io/badger v1.6.1 // indirect
github.com/ethereum/go-ethereum v1.10.9
github.com/ipfs/go-datastore v0.4.4 // indirect
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/libp2p/go-libp2p-kad-dht v0.7.11 // indirect
github.com/libp2p/go-nat v0.0.5 // indirect
github.com/multiformats/go-multihash v0.0.14
github.com/sirupsen/logrus v1.7.0
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/ethereum/go-ethereum v1.9.14
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-datastore v0.5.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.1.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.4.0 // indirect
github.com/jmoiron/sqlx v1.2.0
github.com/kr/pretty v0.3.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.13.0 // indirect
github.com/sirupsen/logrus v1.6.0
github.com/smartystreets/assertions v1.0.0 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.1
github.com/vulcanize/go-eth-state-node-iterator v0.0.1-alpha
github.com/vulcanize/ipfs-blockchain-watcher v0.0.11-alpha
github.com/spf13/viper v1.7.0
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.11 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211209171907-798191bca915 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.27.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
replace github.com/ethereum/go-ethereum v1.10.9 => github.com/vulcanize/go-ethereum v1.10.9-statediff-0.0.27
replace github.com/ethereum/go-ethereum v1.9.14 => github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29
replace github.com/vulcanize/go-eth-state-node-iterator => ../state-node-iterator

1155
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -16,10 +16,9 @@
package snapshot
import (
ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/spf13/viper"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/core"
)
const (
@ -31,15 +30,18 @@ const (
LVL_DB_PATH = "LVL_DB_PATH"
)
type ServiceConfig struct {
// Config is config parameters for DB.
type Config struct {
LevelDBPath string
AncientDBPath string
Node core.Node
DBConfig config.Database
Node ethNode.Info
connectionURI string
DBConfig postgres.ConnectionConfig
}
func (c *ServiceConfig) Init() {
c.DBConfig.Init()
// Init Initialises config
func (c *Config) Init() {
c.dbInit()
viper.BindEnv("leveldb.path", LVL_DB_PATH)
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
@ -47,12 +49,38 @@ func (c *ServiceConfig) Init() {
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
viper.BindEnv("leveldb.ancient", ANCIENT_DB_PATH)
c.Node = core.Node{
c.Node = ethNode.Info{
ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"),
}
c.LevelDBPath = viper.GetString("leveldb.path")
c.AncientDBPath = viper.GetString("leveldb.ancient")
}
func (c *Config) dbInit() {
viper.BindEnv("database.name", postgres.DATABASE_NAME)
viper.BindEnv("database.hostname", postgres.DATABASE_HOSTNAME)
viper.BindEnv("database.port", postgres.DATABASE_PORT)
viper.BindEnv("database.user", postgres.DATABASE_USER)
viper.BindEnv("database.password", postgres.DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", postgres.DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", postgres.DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", postgres.DATABASE_MAX_CONN_LIFETIME)
dbParams := postgres.ConnectionParams{}
// DB params
dbParams.Name = viper.GetString("database.name")
dbParams.Hostname = viper.GetString("database.hostname")
dbParams.Port = viper.GetInt("database.port")
dbParams.User = viper.GetString("database.user")
dbParams.Password = viper.GetString("database.password")
c.connectionURI = postgres.DbConnectionString(dbParams)
// DB config
c.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
c.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
c.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}

View File

@ -21,43 +21,43 @@ import (
"github.com/ethereum/go-ethereum/common"
)
// Node for holding trie node information
type Node struct {
NodeType NodeType
Path []byte
Key common.Hash
Value []byte
// node for holding trie node information
type node struct {
nodeType nodeType
path []byte
key common.Hash
value []byte
}
// NodeType for explicitly setting type of node
type NodeType int
// nodeType for explicitly setting type of node
type nodeType int
const (
Branch NodeType = iota
Extension
Leaf
Removed
Unknown
branch nodeType = iota
extension
leaf
removed
unknown
)
// CheckKeyType checks what type of key we have
func CheckKeyType(elements []interface{}) (NodeType, error) {
func CheckKeyType(elements []interface{}) (nodeType, error) {
if len(elements) > 2 {
return Branch, nil
return branch, nil
}
if len(elements) < 2 {
return Unknown, fmt.Errorf("node cannot be less than two elements in length")
return unknown, fmt.Errorf("node cannot be less than two elements in length")
}
switch elements[0].([]byte)[0] / 16 {
case '\x00':
return Extension, nil
return extension, nil
case '\x01':
return Extension, nil
return extension, nil
case '\x02':
return Leaf, nil
return leaf, nil
case '\x03':
return Leaf, nil
return leaf, nil
default:
return Unknown, fmt.Errorf("unknown hex prefix")
return unknown, fmt.Errorf("unknown hex prefix")
}
}

View File

@ -17,24 +17,39 @@ package snapshot
import (
"bytes"
"fmt"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
const logInterval = 1 * time.Minute
// Publisher is wrapper around DB.
type Publisher struct {
db *postgres.DB
db *postgres.DB
currBatchSize uint
stateNodeCounter uint64
storageNodeCounter uint64
codeNodeCounter uint64
startTime time.Time
}
// NewPublisher creates Publisher
func NewPublisher(db *postgres.DB) *Publisher {
return &Publisher{
db: db,
db: db,
currBatchSize: 0,
startTime: time.Now(),
}
}
@ -44,10 +59,12 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
if err != nil {
return 0, err
}
tx, err := p.db.Beginx()
if err != nil {
return 0, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
@ -58,9 +75,11 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
err = tx.Commit()
}
}()
if err := shared.PublishIPLD(tx, headerNode); err != nil {
if err = shared.PublishIPLD(tx, headerNode); err != nil {
return 0, err
}
mhKey, _ := shared.MultihashKeyFromCIDString(headerNode.Cid().String())
var headerID int64
err = tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated)
@ -69,86 +88,112 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
RETURNING id`,
header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID, "0", header.Root.Hex(), header.TxHash.Hex(),
header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), header.Time, mhKey, 0).Scan(&headerID)
return headerID, err
}
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *Publisher) PublishStateNode(node Node, headerID int64) (int64, error) {
func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (int64, error) {
var stateID int64
var stateKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
stateKey = node.Key.Hex()
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
stateKey = node.key.Hex()
}
tx, err := p.db.Beginx()
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.value)
if err != nil {
return 0, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return 0, err
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET state_path = state_cids.state_path
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`,
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID)
headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID)
// increment state node counter.
atomic.AddUint64(&p.stateNodeCounter, 1)
// increment current batch size counter
p.currBatchSize += 2
return stateID, err
}
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *Publisher) PublishStorageNode(node Node, stateID int64) error {
func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) error {
var storageKey string
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
storageKey = node.Key.Hex()
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) {
storageKey = node.key.Hex()
}
tx, err := p.db.Beginx()
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.value)
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO NOTHING`,
stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
return err
// increment storage node counter.
atomic.AddUint64(&p.storageNodeCounter, 1)
// increment current batch size counter
p.currBatchSize += 2
return nil
}
// PublishCode writes code to the ipfs backing pg datastore
func (p *Publisher) PublishCode(code []byte) error {
func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived
return p.publishRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, code)
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
if err = shared.PublishDirect(tx, mhKey, codeBytes); err != nil {
return fmt.Errorf("error publishing code IPLD: %v", err)
}
// increment code node counter.
atomic.AddUint64(&p.codeNodeCounter, 1)
p.currBatchSize++
return nil
}
func (p *Publisher) publishRaw(codec, mh uint64, raw []byte) error {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return err
func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
var err error
// maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize {
if err = tx.Commit(); err != nil {
return nil, err
}
tx, err = p.db.Beginx()
if err != nil {
return nil, err
}
p.currBatchSize = 0
}
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err = p.db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return err
return tx, nil
}
// logNodeCounters periodically logs the number of node processed.
func (p *Publisher) logNodeCounters() {
t := time.NewTicker(logInterval)
for range t.C {
p.printNodeCounters()
}
}
func (p *Publisher) printNodeCounters() {
logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String())
logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter))
logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter))
logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter))
}

View File

@ -28,11 +28,12 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
iter "github.com/vulcanize/go-eth-state-node-iterator"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
var (
@ -40,27 +41,36 @@ var (
emptyNode, _ = rlp.EncodeToBytes([]byte{})
emptyCodeHash = crypto.Keccak256([]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
defaultBatchSize = uint(100)
)
// Service holds ethDB and stateDB to read data from lvldb and Publisher
// to publish trie in postgres DB.
type Service struct {
ethDB ethdb.Database
stateDB state.Database
ipfsPublisher *Publisher
maxBatchSize uint
}
func NewSnapshotService(con ServiceConfig) (*Service, error) {
pgdb, err := postgres.NewDB(con.DBConfig, con.Node)
// NewSnapshotService creates Service.
func NewSnapshotService(con *Config) (*Service, error) {
pgDB, err := postgres.NewDB(con.connectionURI, con.DBConfig, con.Node)
if err != nil {
return nil, err
}
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", true)
edb, err := rawdb.NewLevelDBDatabaseWithFreezer(con.LevelDBPath, 1024, 256, con.AncientDBPath, "eth-pg-ipfs-state-snapshot", false)
if err != nil {
return nil, err
}
return &Service{
ethDB: edb,
stateDB: state.NewDatabase(edb),
ipfsPublisher: NewPublisher(pgdb),
ipfsPublisher: NewPublisher(pgDB),
maxBatchSize: defaultBatchSize,
}, nil
}
@ -78,6 +88,9 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", params.Height)
}
logrus.Infof("head hash: %s head height: %d", hash.Hex(), params.Height)
headerID, err := s.ipfsPublisher.PublishHeader(header)
if err != nil {
return err
@ -96,15 +109,14 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
}
// Create snapshot up to head (ignores height param)
func (s *Service) CreateLatestSnapshot(params SnapshotParams) error {
func (s *Service) CreateLatestSnapshot(workers uint) error {
logrus.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String())
}
params.Height = *height
return s.CreateSnapshot(params)
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers})
}
type nodeResult struct {
@ -142,15 +154,21 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return err
}
res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf:
case leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
// var account snapshot.Account
var account types.StateAccount
@ -167,22 +185,27 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
if err != nil {
return err
}
// publish any non-nil code referenced by codehash
if !bytes.Equal(account.CodeHash, emptyCodeHash) {
codeBytes, err := s.ethDB.Get(account.CodeHash)
if err != nil {
return err
codeHash := common.BytesToHash(account.CodeHash)
codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
if len(codeBytes) == 0 {
logrus.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
return errors.New("missing code")
}
if err := s.ipfsPublisher.PublishCode(codeBytes); err != nil {
if err = s.ipfsPublisher.PublishCode(codeHash, codeBytes, tx); err != nil {
return err
}
}
if err := s.storageSnapshot(account.Root, stateID); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %v", account, err)
if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
}
case Extension, Branch:
res.node.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(res.node, headerID); err != nil {
case extension, branch:
stateNode.key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID, tx); err != nil {
return err
}
default:
@ -219,42 +242,56 @@ func (s *Service) createSnapshotAsync(tree state.Trie, headerID int64, workers u
return nil
}
func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error {
func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*sqlx.Tx, error) {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
return tx, nil
}
sTrie, err := s.stateDB.OpenTrie(sr)
if err != nil {
return err
return nil, err
}
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) {
// skip value nodes
if it.Leaf() {
continue
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue
}
res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil {
return err
return nil, err
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return nil, err
}
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
if err != nil {
return nil, err
}
switch res.node.NodeType {
case Leaf:
case leaf:
partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
res.node.Key = common.BytesToHash(leafKey)
case Extension, Branch:
case extension, branch:
res.node.Key = common.BytesToHash([]byte{})
default:
return errors.New("unexpected node type")
return nil, errors.New("unexpected node type")
}
if err := s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil {
if err = s.ipfsPublisher.PublishStorageNode(res.node, stateID); err != nil {
return err
}
}
return it.Error()
return tx, it.Error()
}