tests - mock publisher with fixtures

* go structs for header, etc.
* refactor: pkg/types

* add fixture chaindata
This commit is contained in:
Roy Crihfield 2022-01-11 17:49:04 -06:00
parent 2bb06e0a8d
commit 159f93902e
22 changed files with 301 additions and 97 deletions

17
Makefile Normal file
View File

@ -0,0 +1,17 @@
MOCKS_DIR = $(CURDIR)/mocks
mockgen_cmd=mockgen
.PHONY: mocks
# mocks: mocks/ethdb/database.go mocks/state/database.go mocks/snapshot/publisher.go
mocks: mocks/snapshot/publisher.go
# mocks/ethdb/database.go:
# $(mockgen_cmd) -package ethdb -destination $@ github.com/ethereum/go-ethereum/ethdb Database
# mocks/state/database.go:
# $(mockgen_cmd) -package state -destination $@ github.com/ethereum/go-ethereum/core/state Database
mocks/snapshot/publisher.go: pkg/types/publisher.go
$(mockgen_cmd) -package snapshot_mock -destination $@ -source $< Publisher
clean:
rm -f mocks/snapshot/publisher.go

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

37
fixture/service.go Normal file
View File

@ -0,0 +1,37 @@
package fixture
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
// "github.com/jmoiron/sqlx"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
)
var PublishHeader = &types.Header{
ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"),
UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"),
Root: common.HexToHash("0x53580584816f617295ea26c0e17641e0120cab2f0a8ffb53a866fd53aa8e8c2d"),
TxHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"),
ReceiptHash: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"),
Bloom: types.Bloom{},
Difficulty: big.NewInt(+2),
Number: big.NewInt(+1),
GasLimit: 4704588,
GasUsed: 0,
Time: 1492010458,
Extra: []byte{215, 131, 1, 6, 0, 132, 103, 101, 116, 104, 135, 103, 111, 49, 46, 55, 46, 51, 133, 108, 105, 110, 117, 120, 0, 0, 0, 0, 0, 0, 0, 0, 159, 30, 250, 30, 250, 114, 175, 19, 140, 145, 89, 102, 198, 57, 84, 74, 2, 85, 230, 40, 142, 24, 140, 34, 206, 145, 104, 193, 13, 190, 70, 218, 61, 136, 180, 170, 6, 89, 48, 17, 159, 184, 134, 33, 11, 240, 26, 8, 79, 222, 93, 59, 196, 141, 138, 163, 139, 202, 146, 228, 252, 197, 33, 81, 0},
MixDigest: common.Hash{},
Nonce: types.BlockNonce{},
BaseFee: nil,
}
var StateNode = snapt.Node{
NodeType: 0,
Path: []byte{12, 0},
Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"),
Value: []byte{248, 113, 160, 147, 141, 92, 6, 119, 63, 191, 125, 121, 193, 230, 153, 223, 49, 102, 109, 236, 50, 44, 161, 215, 28, 224, 171, 111, 118, 230, 79, 99, 18, 99, 4, 160, 117, 126, 95, 187, 60, 115, 90, 36, 51, 167, 59, 86, 20, 175, 63, 118, 94, 230, 107, 202, 41, 253, 234, 165, 214, 221, 181, 45, 9, 202, 244, 148, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, 160, 247, 170, 155, 102, 71, 245, 140, 90, 255, 89, 193, 131, 99, 31, 85, 161, 78, 90, 0, 204, 46, 253, 15, 71, 120, 19, 109, 123, 255, 0, 188, 27, 128},
}

1
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/ethereum/go-ethereum v1.9.14 github.com/ethereum/go-ethereum v1.9.14
github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/kit v0.10.0 // indirect github.com/go-kit/kit v0.10.0 // indirect
github.com/golang/mock v1.3.1 // indirect
github.com/google/go-cmp v0.5.6 // indirect github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect

1
go.sum
View File

@ -212,6 +212,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

View File

@ -0,0 +1,64 @@
package mock
import (
"fmt"
"strings"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx"
"github.com/golang/mock/gomock"
mocks "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
)
type MockPublisher struct {
*mocks.MockPublisher
}
func NewMockPublisher(t *testing.T) *MockPublisher {
ctl := gomock.NewController(t)
return &MockPublisher{mocks.NewMockPublisher(ctl)}
}
func dump(funcname string, xs ...interface{}) {
if true {
return
}
fmt.Printf(">> %s", funcname)
fmt.Printf(strings.Repeat(" %+v", len(xs))+"\n", xs...)
}
func (p *MockPublisher) PublishHeader(header *types.Header) (int64, error) {
// fmt.Printf("PublishHeader %+v\n", header)
dump("PublishHeader", header)
return p.MockPublisher.PublishHeader(header)
}
func (p *MockPublisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) {
dump("PublishStateNode", node, headerID)
return p.MockPublisher.PublishStateNode(node, headerID, tx)
}
func (p *MockPublisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error {
dump("PublishStorageNode", node, stateID)
return p.MockPublisher.PublishStorageNode(node, stateID, tx)
}
func (p *MockPublisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
dump("PublishCode", codeHash, codeBytes)
return p.MockPublisher.PublishCode(codeHash, codeBytes, tx)
}
func (p *MockPublisher) BeginTx() (*sqlx.Tx, error) {
dump("BeginTx")
return p.MockPublisher.BeginTx()
}
func (p *MockPublisher) CommitTx(tx *sqlx.Tx) error {
dump("CommitTx", tx)
return p.MockPublisher.CommitTx(tx)
}
func (p *MockPublisher) PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error) {
dump("PrepareTxForBatch", tx, batchSize)
return p.MockPublisher.PrepareTxForBatch(tx, batchSize)
}

23
pkg/snapshot/mock/util.go Normal file
View File

@ -0,0 +1,23 @@
package mock
import "fmt"
import "github.com/golang/mock/gomock"
type anyOfMatcher struct {
values []interface{}
}
func (m anyOfMatcher) Matches(x interface{}) bool {
for _, v := range m.values {
if gomock.Eq(v).Matches(x) {
return true
}
}
return false
}
func (m anyOfMatcher) String() string {
return fmt.Sprintf("is equal to any of %+v", m.values)
}
func AnyOf(xs ...interface{}) anyOfMatcher {
return anyOfMatcher{xs}
}

View File

@ -30,12 +30,13 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
) )
const logInterval = 1 * time.Minute const logInterval = 1 * time.Minute
// Publisher is wrapper around DB. // Publisher is wrapper around DB.
type Publisher struct { type publisher struct {
db *postgres.DB db *postgres.DB
currBatchSize uint currBatchSize uint
stateNodeCounter uint64 stateNodeCounter uint64
@ -45,16 +46,31 @@ type Publisher struct {
} }
// NewPublisher creates Publisher // NewPublisher creates Publisher
func NewPublisher(db *postgres.DB) *Publisher { func NewPublisher(db *postgres.DB) *publisher {
return &Publisher{ return &publisher{
db: db, db: db,
currBatchSize: 0, currBatchSize: 0,
startTime: time.Now(), startTime: time.Now(),
} }
} }
func (p *publisher) BeginTx() (*sqlx.Tx, error) {
tx, err := p.db.Beginx()
if err != nil {
return nil, err
}
go p.logNodeCounters()
return tx, nil
}
func (p *publisher) CommitTx(tx *sqlx.Tx) error {
logrus.Info("----- final counts -----")
p.printNodeCounters()
return tx.Commit()
}
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table // PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table
func (p *Publisher) PublishHeader(header *types.Header) (int64, error) { func (p *publisher) PublishHeader(header *types.Header) (int64, error) {
headerNode, err := ipld.NewEthHeader(header) headerNode, err := ipld.NewEthHeader(header)
if err != nil { if err != nil {
return 0, err return 0, err
@ -93,14 +109,14 @@ func (p *Publisher) PublishHeader(header *types.Header) (int64, error) {
} }
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table // PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (int64, error) { func (p *publisher) PublishStateNode(node *snapt.Node, headerID int64, tx *sqlx.Tx) (int64, error) {
var stateID int64 var stateID int64
var stateKey string var stateKey string
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
stateKey = node.key.Hex() stateKey = node.Key.Hex()
} }
stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.value) stateCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, node.Value)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -108,7 +124,7 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`, RETURNING id`,
headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID) headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey).Scan(&stateID)
// increment state node counter. // increment state node counter.
atomic.AddUint64(&p.stateNodeCounter, 1) atomic.AddUint64(&p.stateNodeCounter, 1)
@ -119,20 +135,20 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i
} }
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table // PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) error { func (p *publisher) PublishStorageNode(node *snapt.Node, stateID int64, tx *sqlx.Tx) error {
var storageKey string var storageKey string
if !bytes.Equal(node.key.Bytes(), nullHash.Bytes()) { if !bytes.Equal(node.Key.Bytes(), nullHash.Bytes()) {
storageKey = node.key.Hex() storageKey = node.Key.Hex()
} }
storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.value) storageCIDStr, mhKey, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, node.Value)
if err != nil { if err != nil {
return err return err
} }
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey) stateID, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil { if err != nil {
return err return err
} }
@ -146,7 +162,7 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e
} }
// PublishCode writes code to the ipfs backing pg datastore // PublishCode writes code to the ipfs backing pg datastore
func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error { func (p *publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived // no codec for code, doesn't matter though since blockstore key is multihash-derived
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil { if err != nil {
@ -164,7 +180,7 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx
return nil return nil
} }
func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) { func (p *publisher) PrepareTxForBatch(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, error) {
var err error var err error
// maximum batch size reached, commit the current transaction and begin a new transaction. // maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize { if maxBatchSize <= p.currBatchSize {
@ -184,14 +200,14 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er
} }
// logNodeCounters periodically logs the number of node processed. // logNodeCounters periodically logs the number of node processed.
func (p *Publisher) logNodeCounters() { func (p *publisher) logNodeCounters() {
t := time.NewTicker(logInterval) t := time.NewTicker(logInterval)
for range t.C { for range t.C {
p.printNodeCounters() p.printNodeCounters()
} }
} }
func (p *Publisher) printNodeCounters() { func (p *publisher) printNodeCounters() {
logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String())
logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter))
logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter))

View File

@ -34,6 +34,7 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
. "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
iter "github.com/vulcanize/go-eth-state-node-iterator" iter "github.com/vulcanize/go-eth-state-node-iterator"
) )
@ -51,7 +52,7 @@ var (
type Service struct { type Service struct {
ethDB ethdb.Database ethDB ethdb.Database
stateDB state.Database stateDB state.Database
ipfsPublisher *Publisher ipfsPublisher Publisher
maxBatchSize uint maxBatchSize uint
} }
@ -70,7 +71,7 @@ func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
} }
// NewSnapshotService creates Service. // NewSnapshotService creates Service.
func NewSnapshotService(edb ethdb.Database, pub *Publisher) (*Service, error) { func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) {
return &Service{ return &Service{
ethDB: edb, ethDB: edb,
stateDB: state.NewDatabase(edb), stateDB: state.NewDatabase(edb),
@ -125,11 +126,19 @@ func (s *Service) CreateLatestSnapshot(workers uint) error {
} }
type nodeResult struct { type nodeResult struct {
node node node Node
elements []interface{} elements []interface{}
} }
func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) {
// "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
if it.Leaf() {
return nil, nil
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil, nil
}
path := make([]byte, len(it.Path())) path := make([]byte, len(it.Path()))
copy(path, it.Path()) copy(path, it.Path())
n, err := trieDB.Node(it.Hash()) n, err := trieDB.Node(it.Hash())
@ -145,66 +154,59 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro
return nil, err return nil, err
} }
return &nodeResult{ return &nodeResult{
node: node{ node: Node{
nodeType: ty, NodeType: ty,
path: path, Path: path,
value: n, Value: n,
}, },
elements: elements, elements: elements,
}, nil }, nil
} }
func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error { func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
tx, err := s.ipfsPublisher.db.Beginx() tx, err := s.ipfsPublisher.BeginTx()
if err != nil { if err != nil {
return err return err
} }
go s.ipfsPublisher.logNodeCounters()
defer func() { defer func() {
logrus.Info("----- final counts -----")
s.ipfsPublisher.printNodeCounters()
if rec := recover(); rec != nil { if rec := recover(); rec != nil {
shared.Rollback(tx) shared.Rollback(tx)
panic(rec) panic(rec)
} else if err != nil { } else if err != nil {
shared.Rollback(tx) shared.Rollback(tx)
} else { } else {
err = tx.Commit() err = s.ipfsPublisher.CommitTx(tx)
} }
}() }()
for it.Next(true) { for it.Next(true) {
if it.Leaf() { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves
return nil
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
return nil
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize)
if err != nil {
return err
}
res, err := resolveNode(it, s.stateDB.TrieDB()) res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil { if err != nil {
return err return err
} }
switch res.node.nodeType { if res == nil {
case leaf: continue
}
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil {
return err
}
switch res.node.NodeType {
case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
// var account snapshot.Account
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf( return fmt.Errorf(
"error decoding account for leaf node at path %x nerror: %v", res.node.path, err) "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err)
} }
partialPath := trie.CompactToHex(res.elements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.path, partialPath...) valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
res.node.key = common.BytesToHash(leafKey) res.node.Key = common.BytesToHash(leafKey)
stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx) stateID, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx)
if err != nil { if err != nil {
return err return err
@ -227,8 +229,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil { if tx, err = s.storageSnapshot(account.Root, stateID, tx); err != nil {
return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err)
} }
case extension, branch: case Extension, Branch:
res.node.key = common.BytesToHash([]byte{}) res.node.Key = common.BytesToHash([]byte{})
if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil { if _, err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, tx); err != nil {
return err return err
} }
@ -236,7 +238,6 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID int64) error {
return errors.New("unexpected node type") return errors.New("unexpected node type")
} }
return nil return nil
} }
return it.Error() return it.Error()
} }
@ -278,21 +279,15 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*
it := sTrie.NodeIterator(make([]byte, 0)) it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) { for it.Next(true) {
// skip value nodes
if it.Leaf() {
continue
}
if bytes.Equal(nullHash.Bytes(), it.Hash().Bytes()) {
continue
}
res, err := resolveNode(it, s.stateDB.TrieDB()) res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if res == nil {continue} if res == nil {
continue
}
tx, err = s.ipfsPublisher.checkBatchSize(tx, s.maxBatchSize) tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -302,17 +297,17 @@ func (s *Service) storageSnapshot(sr common.Hash, stateID int64, tx *sqlx.Tx) (*
if err != nil { if err != nil {
return nil, err return nil, err
} }
res.node.value = nodeData res.node.Value = nodeData
switch res.node.nodeType { switch res.node.NodeType {
case leaf: case Leaf:
partialPath := trie.CompactToHex(res.elements[0].([]byte)) partialPath := trie.CompactToHex(res.elements[0].([]byte))
valueNodePath := append(res.node.path, partialPath...) valueNodePath := append(res.node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
res.node.key = common.BytesToHash(leafKey) res.node.Key = common.BytesToHash(leafKey)
case extension, branch: case Extension, Branch:
res.node.key = common.BytesToHash([]byte{}) res.node.Key = common.BytesToHash([]byte{})
default: default:
return nil, errors.New("unexpected node type") return nil, errors.New("unexpected node type")
} }

View File

@ -1,11 +1,19 @@
package snapshot package snapshot
import ( import (
"fmt"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/golang/mock/gomock"
ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node" ethNode "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
// "github.com/ethereum/go-ethereum/ethdb"
fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture"
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/snapshot/mock"
) )
func testConfig(leveldbpath, ancientdbpath string) *Config { func testConfig(leveldbpath, ancientdbpath string) *Config {
@ -42,24 +50,48 @@ func testConfig(leveldbpath, ancientdbpath string) *Config {
} }
} }
func NewMockPublisher() *Publisher {
return nil
}
func TestCreateSnapshot(t *testing.T) { func TestCreateSnapshot(t *testing.T) {
datadir := t.TempDir() wd, err := os.Getwd()
config := testConfig( if err != nil {
filepath.Join(datadir, "leveldb"), panic(err)
filepath.Join(datadir, "ancient"), }
) fixture_path := filepath.Join(wd, "..", "..", "fixture")
datadir := filepath.Join(fixture_path, "chaindata")
if _, err := os.Stat(datadir); err != nil {
t.Fatal("no chaindata:", err)
}
config := testConfig(datadir, filepath.Join(datadir, "ancient"))
fmt.Printf("config: %+v %+v\n", config.DB, config.Eth)
pub := NewMockPublisher() edb, err := NewLevelDB(config.Eth)
service, err := NewSnapshotService(config.Eth, pub) if err != nil {
t.Fatal(err)
}
workers := 8
pub := mock.NewMockPublisher(t)
pub.EXPECT().PublishHeader(gomock.Eq(fixt.PublishHeader))
pub.EXPECT().BeginTx().
// AnyTimes()
Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).
// AnyTimes()
Times(workers)
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
// pub.EXPECT().PublishStateNode(statenode, gomock.Eq(int64(1)), gomock.Any())
pub.EXPECT().PublishStateNode(gomock.Any(), mock.AnyOf(int64(0), int64(1)), gomock.Any()).
Times(workers)
pub.EXPECT().CommitTx(gomock.Any()).
Times(workers)
service, err := NewSnapshotService(edb, pub)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
params := SnapshotParams{Height: 1} params := SnapshotParams{Height: 1, Workers: uint(workers)}
err = service.CreateSnapshot(params) err = service.CreateSnapshot(params)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package snapshot package types
import ( import (
"fmt" "fmt"
@ -23,41 +23,42 @@ import (
// node for holding trie node information // node for holding trie node information
type node struct { type node struct {
nodeType nodeType NodeType nodeType
path []byte Path []byte
key common.Hash Key common.Hash
value []byte Value []byte
} }
type Node = node
// nodeType for explicitly setting type of node // nodeType for explicitly setting type of node
type nodeType int type nodeType int
const ( const (
branch nodeType = iota Branch nodeType = iota
extension Extension
leaf Leaf
removed Removed
unknown Unknown
) )
// CheckKeyType checks what type of key we have // CheckKeyType checks what type of key we have
func CheckKeyType(elements []interface{}) (nodeType, error) { func CheckKeyType(elements []interface{}) (nodeType, error) {
if len(elements) > 2 { if len(elements) > 2 {
return branch, nil return Branch, nil
} }
if len(elements) < 2 { if len(elements) < 2 {
return unknown, fmt.Errorf("node cannot be less than two elements in length") return Unknown, fmt.Errorf("node cannot be less than two elements in length")
} }
switch elements[0].([]byte)[0] / 16 { switch elements[0].([]byte)[0] / 16 {
case '\x00': case '\x00':
return extension, nil return Extension, nil
case '\x01': case '\x01':
return extension, nil return Extension, nil
case '\x02': case '\x02':
return leaf, nil return Leaf, nil
case '\x03': case '\x03':
return leaf, nil return Leaf, nil
default: default:
return unknown, fmt.Errorf("unknown hex prefix") return Unknown, fmt.Errorf("unknown hex prefix")
} }
} }

17
pkg/types/publisher.go Normal file
View File

@ -0,0 +1,17 @@
package types
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx"
)
type Publisher interface {
PublishHeader(header *types.Header) (int64, error)
PublishStateNode(node *Node, headerID int64, tx *sqlx.Tx) (int64, error)
PublishStorageNode(node *Node, stateID int64, tx *sqlx.Tx) error
PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx.Tx) error
BeginTx() (*sqlx.Tx, error)
CommitTx(*sqlx.Tx) error
PrepareTxForBatch(tx *sqlx.Tx, batchSize uint) (*sqlx.Tx, error)
}