finish snapshot extractor

This commit is contained in:
Ian Norden 2020-07-01 18:07:56 -05:00
parent 83d5b6491f
commit 5cb67910bd
3 changed files with 59 additions and 28 deletions

View File

@ -15,17 +15,29 @@
package snapshot
import "fmt"
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
)
// Node for holding trie node information
type Node struct {
NodeType NodeType
Path []byte
Key common.Hash
Value []byte
}
// NodeType for explicitly setting type of node
type NodeType string
type NodeType int
const (
Unknown NodeType = "Unknown"
Leaf NodeType = "Leaf"
Extension NodeType = "Extension"
Branch NodeType = "Branch"
Removed NodeType = "Removed" // used to represent pathes which have been emptied
Branch NodeType = iota
Extension
Leaf
Removed
Unknown
)
// CheckKeyType checks what type of key we have

View File

@ -16,10 +16,11 @@
package snapshot
import (
"bytes"
"github.com/ethereum/go-ethereum/core/types"
"github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
@ -67,11 +68,11 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
return headerID, err
}
func (p *Publisher) PublishStateNode(meta eth.StateNodeModel, nodeVal []byte, headerID int64) (int64, error) {
func (p *Publisher) PublishStateNode(node Node, headerID int64) (int64, error) {
var stateID int64
var stateKey string
if meta.StateKey != nullHash.String() {
stateKey = meta.StateKey
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
stateKey = node.Key.Hex()
}
tx, err := p.db.Beginx()
if err != nil {
@ -87,21 +88,21 @@ func (p *Publisher) PublishStateNode(meta eth.StateNodeModel, nodeVal []byte, he
err = tx.Commit()
}
}()
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, nodeVal)
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return 0, err
}
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type) = ($2, $3, $5, $6)
RETURNING id`,
headerID, stateKey, stateCIDStr, meta.Path, meta.NodeType, false).Scan(&stateID)
headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false).Scan(&stateID)
return stateID, err
}
func (p *Publisher) PublishStorageNode(meta eth.StorageNodeModel, nodeVal []byte, stateID int64) error {
func (p *Publisher) PublishStorageNode(node Node, stateID int64) error {
var storageKey string
if meta.StorageKey != nullHash.String() {
storageKey = meta.StorageKey
if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
storageKey = node.Key.Hex()
}
tx, err := p.db.Beginx()
if err != nil {
@ -117,12 +118,12 @@ func (p *Publisher) PublishStorageNode(meta eth.StorageNodeModel, nodeVal []byte
err = tx.Commit()
}
}()
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, nodeVal)
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
if err != nil {
return err
}
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)`,
stateID, storageKey, storageCIDStr, meta.Path, meta.NodeType, false)
stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false)
return err
}

View File

@ -17,6 +17,7 @@ package snapshot
import (
"bytes"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
@ -28,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
@ -98,6 +98,11 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
if err != nil {
return err
}
stateNode := Node{
NodeType: ty,
Path: nodePath,
Value: node,
}
switch ty {
case Leaf:
var account state.Account
@ -108,18 +113,24 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
valueNodePath := append(nodePath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
// publish state node
stateNode := eth.StateNodeModel{}
stateNode.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID)
if err != nil {
return err
}
if err := s.storageSnapshot(account.Root, stateID); err != nil {
return fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err)
}
case Extension, Branch:
// publish state node
stateNode := eth.StateNodeModel{}
stateNode.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(stateNode, headerID); err != nil {
return err
}
default:
return fmt.Errorf("unexpected node type %s", ty)
return errors.New("unexpected node type")
}
}
return nil
}
// buildStorageNodesEventual builds the storage diff node objects for a created account
@ -157,18 +168,25 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64) error {
if err != nil {
return err
}
storageNode := Node{
NodeType: ty,
Path: nodePath,
Value: node,
}
switch ty {
case Leaf:
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
storageNode := eth.StorageNodeModel{}
storageNode.Key = common.BytesToHash(leafKey)
case Extension, Branch:
storageNode := eth.StorageNodeModel{}
storageNode.Key = common.BytesToHash([]byte{})
default:
return fmt.Errorf("unexpected node type %s", ty)
return errors.New("unexpected node type")
}
if err := s.ipfsPublisher.PublishStorageNode(storageNode, stateID); err != nil {
return err
}
}
return nil