ipld-eth-state-snapshot/pkg/snapshot/service_test.go
Ian Norden 05aeeab581
Account selective snapshot (#46)
* snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided

* config and env updates

* cmd update

* Encode watched address path bytes to hex for comparison

* actually ignore the subtries that are not along the paths of interest

* Fixes for account selective snapshot

* Use non-concurrent iterator when having a single worker

* Only index root node when starting path of an iterator is nil

* Upgrade deps

* Avoid tracking iterators and skip recovery test

* Fix recovery mechanism, use sync Map instead of buffered channels

* Add test for account selective snapshot

* Continue traversal with concurrent iterators with starting path nil

* Use errgroup to simplify error handling with concurrent iterators

* Check if all the nodes are indexed in the recovery test

* Use concurrency safe sync Map in account selective snapshot test

* Only track concurrent iterators and refactor code

* Fix node and recovered path comparison

* Revert back to using buffered channels for tracking iterators

* Add a metric to monitor number of active iterators

* Update docs

* Update seeked path after node is processed

* Return error on context cancellation from subtrie iteration

* Add tests for account selective snapshot recovery

* Explicity enforce concurrent iterator bounds to avoid duplicate nodes

* Update full snapshot test to check nodes being indexed

* Refactor code to simplify snapshot logic

* Remove unnecessary function argument

* Use ctx cancellation for handling signals

* Add descriptive comments

Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 17:05:04 +05:30

571 lines
18 KiB
Go

package snapshot
import (
"errors"
"fmt"
"math/big"
"math/rand"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/golang/mock/gomock"
fixt "github.com/vulcanize/ipld-eth-state-snapshot/fixture"
mock "github.com/vulcanize/ipld-eth-state-snapshot/mocks/snapshot"
snapt "github.com/vulcanize/ipld-eth-state-snapshot/pkg/types"
"github.com/vulcanize/ipld-eth-state-snapshot/test"
)
var (
stateNodeNotIndexedErr = "state node not indexed for path %v"
storageNodeNotIndexedErr = "storage node not indexed for state path %v, storage path %v"
unexpectedStateNodeErr = "got unexpected state node for path %v"
unexpectedStorageNodeErr = "got unexpected storage node for state path %v, storage path %v"
extraNodesIndexedErr = "number of nodes indexed (%v) is more than expected (max %v)"
)
func testConfig(leveldbpath, ancientdbpath string) *Config {
return &Config{
Eth: &EthConfig{
LevelDBPath: leveldbpath,
AncientDBPath: ancientdbpath,
NodeInfo: test.DefaultNodeInfo,
},
DB: &DBConfig{
URI: test.DefaultPgConfig.DbConnectionString(),
ConnConfig: test.DefaultPgConfig,
},
}
}
func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) {
ctl := gomock.NewController(t)
pub := mock.NewMockPublisher(ctl)
tx := mock.NewMockTx(ctl)
return pub, tx
}
func TestCreateSnapshot(t *testing.T) {
runCase := func(t *testing.T, workers int) {
// map: expected state path -> struct{}{}
expectedStateNodePaths := sync.Map{}
for _, path := range fixt.Block1_StateNodePaths {
expectedStateNodePaths.Store(string(path), struct{}{})
}
pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
pub.EXPECT().BeginTx().Return(tx, nil).
Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
AnyTimes()
tx.EXPECT().Commit().
Times(workers)
pub.EXPECT().PublishStateNode(
gomock.Any(),
gomock.Eq(fixt.Block1_Header.Hash().String()),
gomock.Eq(fixt.Block1_Header.Number),
gomock.Eq(tx)).
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
if _, ok := expectedStateNodePaths.Load(string(node.Path)); ok {
expectedStateNodePaths.Delete(string(node.Path))
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
return nil
}).
Times(len(fixt.Block1_StateNodePaths))
// TODO: fixtures for storage node
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata")
config := testConfig(chainDataPath, ancientDataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: 1, Workers: uint(workers)}
err = service.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
// Check if all expected state nodes are indexed
expectedStateNodePaths.Range(func(key, value any) bool {
t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string)))
return true
})
}
testCases := []int{1, 4, 8, 16, 32}
for _, tc := range testCases {
t.Run("case", func(t *testing.T) { runCase(t, tc) })
}
}
type indexedNode struct {
value snapt.Node
isIndexed bool
}
type storageNodeKey struct {
statePath string
storagePath string
}
func TestAccountSelectiveSnapshot(t *testing.T) {
snapShotHeight := uint64(32)
watchedAddresses := map[common.Address]struct{}{
common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {},
common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {},
}
expectedStateNodeIndexes := []int{0, 1, 2, 6}
statePath33 := []byte{3, 3}
expectedStorageNodeIndexes33 := []int{0, 1, 2, 3, 4, 6, 8}
statePath12 := []byte{12}
expectedStorageNodeIndexes12 := []int{12, 14, 16}
runCase := func(t *testing.T, workers int) {
expectedStateNodes := sync.Map{}
for _, expectedStateNodeIndex := range expectedStateNodeIndexes {
path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path
expectedStateNodes.Store(string(path), indexedNode{
value: fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex],
isIndexed: false,
})
}
expectedStorageNodes := sync.Map{}
for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes33 {
path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path
key := storageNodeKey{
statePath: string(statePath33),
storagePath: string(path),
}
value := indexedNode{
value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node,
isIndexed: false,
}
expectedStorageNodes.Store(key, value)
}
for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes12 {
path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path
key := storageNodeKey{
statePath: string(statePath12),
storagePath: string(path),
}
value := indexedNode{
value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node,
isIndexed: false,
}
expectedStorageNodes.Store(key, value)
}
pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
pub.EXPECT().BeginTx().Return(tx, nil).
Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
AnyTimes()
tx.EXPECT().Commit().
Times(workers)
pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)).
AnyTimes()
pub.EXPECT().PublishStateNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(fixt.Chain2_Block32_Header.Number),
gomock.Eq(tx)).
Do(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
key := string(node.Path)
// Check published nodes
if expectedStateNode, ok := expectedStateNodes.Load(key); ok {
expectedVal := expectedStateNode.(indexedNode).value
test.ExpectEqual(t, expectedVal, *node)
// Mark expected node as indexed
expectedStateNodes.Store(key, indexedNode{
value: expectedVal,
isIndexed: true,
})
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
return nil
}).
AnyTimes()
pub.EXPECT().PublishStorageNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(new(big.Int).SetUint64(snapShotHeight)),
gomock.Any(),
gomock.Eq(tx)).
Do(func(node *snapt.Node, _ string, _ *big.Int, statePath []byte, _ snapt.Tx) error {
key := storageNodeKey{
statePath: string(statePath),
storagePath: string(node.Path),
}
// Check published nodes
if expectedStorageNode, ok := expectedStorageNodes.Load(key); ok {
expectedVal := expectedStorageNode.(indexedNode).value
test.ExpectEqual(t, expectedVal, *node)
// Mark expected node as indexed
expectedStorageNodes.Store(key, indexedNode{
value: expectedVal,
isIndexed: true,
})
} else {
t.Fatalf(unexpectedStorageNodeErr, statePath, node.Path)
}
return nil
}).
AnyTimes()
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data")
config := testConfig(chainDataPath, ancientDataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: snapShotHeight, Workers: uint(workers), WatchedAddresses: watchedAddresses}
err = service.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
expectedStateNodes.Range(func(key, value any) bool {
if !value.(indexedNode).isIndexed {
t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string)))
return false
}
return true
})
expectedStorageNodes.Range(func(key, value any) bool {
if !value.(indexedNode).isIndexed {
t.Fatalf(storageNodeNotIndexedErr, []byte(key.(storageNodeKey).statePath), []byte(key.(storageNodeKey).storagePath))
return false
}
return true
})
}
testCases := []int{1, 4, 8, 16, 32}
for _, tc := range testCases {
t.Run("case", func(t *testing.T) { runCase(t, tc) })
}
}
func TestRecovery(t *testing.T) {
maxPathLength := 4
runCase := func(t *testing.T, workers int, interruptAt int32) {
// map: expected state path -> number of times it got published
expectedStateNodePaths := sync.Map{}
for _, path := range fixt.Block1_StateNodePaths {
expectedStateNodePaths.Store(string(path), 0)
}
var indexedStateNodesCount int32
pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Commit().MaxTimes(workers)
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
// Start throwing an error after a certain number of state nodes have been indexed
if indexedStateNodesCount >= interruptAt {
return errors.New("failingPublishStateNode")
} else {
if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok {
expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1)
atomic.AddInt32(&indexedStateNodesCount, 1)
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
}
return nil
}).
MaxTimes(int(interruptAt) + workers)
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata")
config := testConfig(chainDataPath, ancientDataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: 1, Workers: uint(workers)}
err = service.CreateSnapshot(params)
if err == nil {
t.Fatal("expected an error")
}
if _, err = os.Stat(recovery); err != nil {
t.Fatal("cannot stat recovery file:", err)
}
// Create new mocks for recovery
recoveryPub, tx := makeMocks(t)
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
recoveryPub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Commit().AnyTimes()
recoveryPub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok {
expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1)
atomic.AddInt32(&indexedStateNodesCount, 1)
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
return nil
}).
AnyTimes()
// Create a new snapshot service for recovery
recoveryService, err := NewSnapshotService(edb, recoveryPub, recovery)
if err != nil {
t.Fatal(err)
}
err = recoveryService.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
// Check if recovery file has been deleted
_, err = os.Stat(recovery)
if err == nil {
t.Fatal("recovery file still present")
} else {
if !os.IsNotExist(err) {
t.Fatal(err)
}
}
// Check if all state nodes are indexed after recovery
expectedStateNodePaths.Range(func(key, value any) bool {
if value.(int) == 0 {
t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string)))
}
return true
})
// nodes along the recovery path get reindexed
maxStateNodesCount := len(fixt.Block1_StateNodePaths) + workers*maxPathLength
if indexedStateNodesCount > int32(maxStateNodesCount) {
t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount)
}
}
testCases := []int{1, 2, 4, 8, 16, 32}
numInterrupts := 3
interrupts := make([]int32, numInterrupts)
for i := 0; i < numInterrupts; i++ {
rand.Seed(time.Now().UnixNano())
interrupts[i] = rand.Int31n(int32(len(fixt.Block1_StateNodePaths)))
}
for _, tc := range testCases {
for _, interrupt := range interrupts {
t.Run(fmt.Sprint("case", tc, interrupt), func(t *testing.T) { runCase(t, tc, interrupt) })
}
}
}
func TestAccountSelectiveRecovery(t *testing.T) {
maxPathLength := 2
snapShotHeight := uint64(32)
watchedAddresses := map[common.Address]struct{}{
common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {},
common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {},
}
expectedStateNodeIndexes := []int{0, 1, 2, 6}
runCase := func(t *testing.T, workers int, interruptAt int32) {
// map: expected state path -> number of times it got published
expectedStateNodePaths := sync.Map{}
for _, expectedStateNodeIndex := range expectedStateNodeIndexes {
path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path
expectedStateNodePaths.Store(string(path), 0)
}
var indexedStateNodesCount int32
pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
pub.EXPECT().BeginTx().Return(tx, nil).Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Commit().Times(workers)
pub.EXPECT().PublishStateNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(fixt.Chain2_Block32_Header.Number),
gomock.Eq(tx)).
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
// Start throwing an error after a certain number of state nodes have been indexed
if indexedStateNodesCount >= interruptAt {
return errors.New("failingPublishStateNode")
} else {
if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok {
expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1)
atomic.AddInt32(&indexedStateNodesCount, 1)
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
}
return nil
}).
MaxTimes(int(interruptAt) + workers)
pub.EXPECT().PublishStorageNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(new(big.Int).SetUint64(snapShotHeight)),
gomock.Any(),
gomock.Eq(tx)).
AnyTimes()
pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)).
AnyTimes()
chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data")
config := testConfig(chainDataPath, ancientDataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: snapShotHeight, Workers: uint(workers), WatchedAddresses: watchedAddresses}
err = service.CreateSnapshot(params)
if err == nil {
t.Fatal("expected an error")
}
if _, err = os.Stat(recovery); err != nil {
t.Fatal("cannot stat recovery file:", err)
}
// Create new mocks for recovery
recoveryPub, tx := makeMocks(t)
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
recoveryPub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
tx.EXPECT().Commit().MaxTimes(workers)
recoveryPub.EXPECT().PublishStateNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(fixt.Chain2_Block32_Header.Number),
gomock.Eq(tx)).
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok {
expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1)
atomic.AddInt32(&indexedStateNodesCount, 1)
} else {
t.Fatalf(unexpectedStateNodeErr, node.Path)
}
return nil
}).
AnyTimes()
recoveryPub.EXPECT().PublishStorageNode(
gomock.Any(),
gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()),
gomock.Eq(new(big.Int).SetUint64(snapShotHeight)),
gomock.Any(),
gomock.Eq(tx)).
AnyTimes()
recoveryPub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)).
AnyTimes()
// Create a new snapshot service for recovery
recoveryService, err := NewSnapshotService(edb, recoveryPub, recovery)
if err != nil {
t.Fatal(err)
}
err = recoveryService.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
// Check if recovery file has been deleted
_, err = os.Stat(recovery)
if err == nil {
t.Fatal("recovery file still present")
} else {
if !os.IsNotExist(err) {
t.Fatal(err)
}
}
// Check if all expected state nodes are indexed after recovery
expectedStateNodePaths.Range(func(key, value any) bool {
if value.(int) == 0 {
t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string)))
}
return true
})
// nodes along the recovery path get reindexed
maxStateNodesCount := len(expectedStateNodeIndexes) + workers*maxPathLength
if indexedStateNodesCount > int32(maxStateNodesCount) {
t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount)
}
}
testCases := []int{1, 2, 4, 8, 16, 32}
numInterrupts := 2
interrupts := make([]int32, numInterrupts)
for i := 0; i < numInterrupts; i++ {
rand.Seed(time.Now().UnixNano())
interrupts[i] = rand.Int31n(int32(len(expectedStateNodeIndexes)))
}
for _, tc := range testCases {
for _, interrupt := range interrupts {
t.Run(fmt.Sprint("case", tc, interrupt), func(t *testing.T) { runCase(t, tc, interrupt) })
}
}
}