diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index 8cff6d2..93f0c08 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -3,7 +3,6 @@ package snapshot import ( "errors" "fmt" - "math/big" "math/rand" "os" "path/filepath" @@ -13,6 +12,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/golang/mock/gomock" fixt "github.com/cerc-io/ipld-eth-state-snapshot/fixture" @@ -22,11 +22,13 @@ import ( ) var ( - stateNodeNotIndexedErr = "state node not indexed for path %v" - storageNodeNotIndexedErr = "storage node not indexed for state path %v, storage path %v" + stateNodeDuplicateErr = "state node indexed multiple times (%d) for state key %v" + storageNodeDuplicateErr = "storage node indexed multiple times (%d) for state key %v and storage key %v" + stateNodeNotIndexedErr = "state node not indexed for state key %v" + storageNodeNotIndexedErr = "storage node not indexed for state key %v, storage key %v" - unexpectedStateNodeErr = "got unexpected state node for path %v" - unexpectedStorageNodeErr = "got unexpected storage node for state path %v, storage path %v" + unexpectedStateNodeErr = "got unexpected state node for state key %v" + unexpectedStorageNodeErr = "got unexpected storage node for state key %v, storage key %v" extraNodesIndexedErr = "number of nodes indexed (%v) is more than expected (max %v)" ) @@ -54,10 +56,9 @@ func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) { func TestCreateSnapshot(t *testing.T) { runCase := func(t *testing.T, workers int) { - // map: expected state path -> struct{}{} - expectedStateNodePaths := sync.Map{} - for _, path := range fixt.Block1_StateNodePaths { - expectedStateNodePaths.Store(string(path), struct{}{}) + expectedStateLeafKeys := sync.Map{} + for _, key := range fixt.Block1_StateNodeLeafKeys { + expectedStateLeafKeys.Store(key, struct{}{}) } pub, tx := makeMocks(t) @@ -68,23 +69,27 @@ func TestCreateSnapshot(t *testing.T) { AnyTimes() tx.EXPECT().Commit(). Times(workers) - pub.EXPECT().PublishStateNode( + pub.EXPECT().PublishStateLeafNode( gomock.Any(), - gomock.Eq(fixt.Block1_Header.Hash().String()), - gomock.Eq(fixt.Block1_Header.Number), gomock.Eq(tx)). - DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { - if _, ok := expectedStateNodePaths.Load(string(node.Path)); ok { - expectedStateNodePaths.Delete(string(node.Path)) + Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + if stateNode.BlockNumber != fixt.Block1_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Block1_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if _, ok := expectedStateLeafKeys.Load(stateNode.StateKey); ok { + expectedStateLeafKeys.Delete(stateNode.StateKey) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } return nil }). - Times(len(fixt.Block1_StateNodePaths)) - - // TODO: fixtures for storage node - // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) + AnyTimes() + pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Block1_Header.Number), gomock.Eq(tx)). + AnyTimes() + // Note: block 1 doesn't have storage nodes. TODO: add fixtures with storage nodes chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata") config := testConfig(chainDataPath, ancientDataPath) @@ -107,8 +112,8 @@ func TestCreateSnapshot(t *testing.T) { } // Check if all expected state nodes are indexed - expectedStateNodePaths.Range(func(key, value any) bool { - t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + expectedStateLeafKeys.Range(func(key, value any) bool { + t.Fatalf(stateNodeNotIndexedErr, key.(string)) return true }) } @@ -119,14 +124,19 @@ func TestCreateSnapshot(t *testing.T) { } } -type indexedNode struct { - value snapt.Node +type indexedStateLeafNode struct { + value models.StateNodeModel + isIndexed bool +} + +type indexedStorageLeafNode struct { + value models.StorageNodeModel isIndexed bool } type storageNodeKey struct { - statePath string - storagePath string + stateKey string + storageKey string } func TestAccountSelectiveSnapshot(t *testing.T) { @@ -135,21 +145,20 @@ func TestAccountSelectiveSnapshot(t *testing.T) { common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {}, common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {}, } + expectedStateNodeIndexes := []int{0, 4} - expectedStateNodeIndexes := []int{0, 1, 2, 6} + stateKey33 := common.HexToHash("0x33153abc667e873b6036c8a46bdd847e2ade3f89b9331c78ef2553fea194c50d").String() + expectedStorageNodeIndexes33 := []int{0, 1, 2, 4, 6} - statePath33 := []byte{3, 3} - expectedStorageNodeIndexes33 := []int{0, 1, 2, 3, 4, 6, 8} - - statePath12 := []byte{12} - expectedStorageNodeIndexes12 := []int{12, 14, 16} + stateKey12 := common.HexToHash("0xcabc5edb305583e33f66322ceee43088aa99277da772feb5053512d03a0a702b").String() + expectedStorageNodeIndexes12 := []int{9, 11} runCase := func(t *testing.T, workers int) { expectedStateNodes := sync.Map{} for _, expectedStateNodeIndex := range expectedStateNodeIndexes { - path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path - expectedStateNodes.Store(string(path), indexedNode{ + key := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].StateKey + expectedStateNodes.Store(key, indexedStateLeafNode{ value: fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex], isIndexed: false, }) @@ -158,31 +167,33 @@ func TestAccountSelectiveSnapshot(t *testing.T) { expectedStorageNodes := sync.Map{} for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes33 { - path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path - key := storageNodeKey{ - statePath: string(statePath33), - storagePath: string(path), + key := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].StorageKey + keys := storageNodeKey{ + stateKey: stateKey33, + storageKey: key, } - value := indexedNode{ - value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node, + value := indexedStorageLeafNode{ + value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex], isIndexed: false, } - expectedStorageNodes.Store(key, value) + expectedStorageNodes.Store(keys, value) } for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes12 { - path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path - key := storageNodeKey{ - statePath: string(statePath12), - storagePath: string(path), + key := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].StorageKey + keys := storageNodeKey{ + stateKey: stateKey12, + storageKey: key, } - value := indexedNode{ - value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node, + value := indexedStorageLeafNode{ + value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex], isIndexed: false, } - expectedStorageNodes.Store(key, value) + expectedStorageNodes.Store(keys, value) } + var count int + pub, tx := makeMocks(t) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().BeginTx().Return(tx, nil). @@ -191,58 +202,66 @@ func TestAccountSelectiveSnapshot(t *testing.T) { AnyTimes() tx.EXPECT().Commit(). Times(workers) - pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). - AnyTimes() - pub.EXPECT().PublishStateNode( + pub.EXPECT().PublishStateLeafNode( gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). - Do(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { - key := string(node.Path) + Do(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + count++ + if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + key := stateNode.StateKey // Check published nodes if expectedStateNode, ok := expectedStateNodes.Load(key); ok { - expectedVal := expectedStateNode.(indexedNode).value - test.ExpectEqual(t, expectedVal, *node) + expectedVal := expectedStateNode.(indexedStateLeafNode).value + test.ExpectEqual(t, expectedVal, *stateNode) // Mark expected node as indexed - expectedStateNodes.Store(key, indexedNode{ + expectedStateNodes.Store(key, indexedStateLeafNode{ value: expectedVal, isIndexed: true, }) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } return nil }). AnyTimes() - pub.EXPECT().PublishStorageNode( - gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + pub.EXPECT().PublishStorageLeafNode( gomock.Any(), gomock.Eq(tx)). - Do(func(node *snapt.Node, _ string, _ *big.Int, statePath []byte, _ snapt.Tx) error { + Do(func(storageNode *models.StorageNodeModel, _ snapt.Tx) error { + if storageNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + if storageNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } key := storageNodeKey{ - statePath: string(statePath), - storagePath: string(node.Path), + stateKey: storageNode.StateKey, + storageKey: storageNode.StorageKey, } // Check published nodes if expectedStorageNode, ok := expectedStorageNodes.Load(key); ok { - expectedVal := expectedStorageNode.(indexedNode).value - test.ExpectEqual(t, expectedVal, *node) + expectedVal := expectedStorageNode.(indexedStorageLeafNode).value + test.ExpectEqual(t, expectedVal, *storageNode) // Mark expected node as indexed - expectedStorageNodes.Store(key, indexedNode{ + expectedStorageNodes.Store(key, indexedStorageLeafNode{ value: expectedVal, isIndexed: true, }) } else { - t.Fatalf(unexpectedStorageNodeErr, statePath, node.Path) + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) } return nil }). AnyTimes() + pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). + AnyTimes() chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") config := testConfig(chainDataPath, ancientDataPath) @@ -265,15 +284,15 @@ func TestAccountSelectiveSnapshot(t *testing.T) { } expectedStateNodes.Range(func(key, value any) bool { - if !value.(indexedNode).isIndexed { - t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + if !value.(indexedStateLeafNode).isIndexed { + t.Fatalf(stateNodeNotIndexedErr, key) return false } return true }) expectedStorageNodes.Range(func(key, value any) bool { - if !value.(indexedNode).isIndexed { - t.Fatalf(storageNodeNotIndexedErr, []byte(key.(storageNodeKey).statePath), []byte(key.(storageNodeKey).storagePath)) + if !value.(indexedStorageLeafNode).isIndexed { + t.Fatalf(storageNodeNotIndexedErr, key.(storageNodeKey).stateKey, key.(storageNodeKey).storageKey) return false } return true @@ -287,12 +306,11 @@ func TestAccountSelectiveSnapshot(t *testing.T) { } func TestRecovery(t *testing.T) { - maxPathLength := 4 runCase := func(t *testing.T, workers int, interruptAt int32) { // map: expected state path -> number of times it got published - expectedStateNodePaths := sync.Map{} - for _, path := range fixt.Block1_StateNodePaths { - expectedStateNodePaths.Store(string(path), 0) + expectedStateNodeKeys := sync.Map{} + for _, key := range fixt.Block1_StateNodeLeafKeys { + expectedStateNodeKeys.Store(key, 0) } var indexedStateNodesCount int32 @@ -301,22 +319,32 @@ func TestRecovery(t *testing.T) { pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() tx.EXPECT().Commit().MaxTimes(workers) - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + pub.EXPECT().PublishStateLeafNode( + gomock.Any(), + gomock.Eq(tx)). + DoAndReturn(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + if stateNode.BlockNumber != fixt.Block1_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Block1_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } // Start throwing an error after a certain number of state nodes have been indexed if indexedStateNodesCount >= interruptAt { - return errors.New("failingPublishStateNode") + return errors.New("failingPublishStateLeafNode") } else { - if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { - expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + if prevCount, ok := expectedStateNodeKeys.Load(stateNode.StateKey); ok { + expectedStateNodeKeys.Store(stateNode.StateKey, prevCount.(int)+1) atomic.AddInt32(&indexedStateNodesCount, 1) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } } return nil }). MaxTimes(int(interruptAt) + workers) + pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Block1_Header.Number), gomock.Eq(tx)). + AnyTimes() chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata") config := testConfig(chainDataPath, ancientDataPath) @@ -348,17 +376,27 @@ func TestRecovery(t *testing.T) { recoveryPub.EXPECT().BeginTx().Return(tx, nil).AnyTimes() recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() tx.EXPECT().Commit().AnyTimes() - recoveryPub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { - if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { - expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + recoveryPub.EXPECT().PublishStateLeafNode( + gomock.Any(), + gomock.Eq(tx)). + DoAndReturn(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + if stateNode.BlockNumber != fixt.Block1_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Block1_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if prevCount, ok := expectedStateNodeKeys.Load(stateNode.StateKey); ok { + expectedStateNodeKeys.Store(stateNode.StateKey, prevCount.(int)+1) atomic.AddInt32(&indexedStateNodesCount, 1) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } return nil }). AnyTimes() + recoveryPub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Block1_Header.Number), gomock.Eq(tx)). + AnyTimes() // Create a new snapshot service for recovery recoveryService, err := NewSnapshotService(edb, recoveryPub, recovery) @@ -381,15 +419,15 @@ func TestRecovery(t *testing.T) { } // Check if all state nodes are indexed after recovery - expectedStateNodePaths.Range(func(key, value any) bool { + expectedStateNodeKeys.Range(func(key, value any) bool { if value.(int) == 0 { - t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + t.Fatalf(stateNodeNotIndexedErr, key.(string)) } return true }) // nodes along the recovery path get reindexed - maxStateNodesCount := len(fixt.Block1_StateNodePaths) + workers*maxPathLength + maxStateNodesCount := len(fixt.Block1_StateNodeLeafKeys) if indexedStateNodesCount > int32(maxStateNodesCount) { t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount) } @@ -400,7 +438,7 @@ func TestRecovery(t *testing.T) { interrupts := make([]int32, numInterrupts) for i := 0; i < numInterrupts; i++ { rand.Seed(time.Now().UnixNano()) - interrupts[i] = rand.Int31n(int32(len(fixt.Block1_StateNodePaths))) + interrupts[i] = rand.Int31n(int32(len(fixt.Block1_StateNodeLeafKeys) / 2)) } for _, tc := range testCases { @@ -411,57 +449,87 @@ func TestRecovery(t *testing.T) { } func TestAccountSelectiveRecovery(t *testing.T) { - maxPathLength := 2 snapShotHeight := uint64(32) watchedAddresses := map[common.Address]struct{}{ common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {}, common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {}, } - expectedStateNodeIndexes := []int{0, 1, 2, 6} + expectedStateNodeIndexes := []int{0, 4} + expectedStorageNodeIndexes := []int{0, 1, 2, 4, 6, 9, 11} runCase := func(t *testing.T, workers int, interruptAt int32) { // map: expected state path -> number of times it got published - expectedStateNodePaths := sync.Map{} + expectedStateNodeKeys := sync.Map{} for _, expectedStateNodeIndex := range expectedStateNodeIndexes { - path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path - expectedStateNodePaths.Store(string(path), 0) + key := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].StateKey + expectedStateNodeKeys.Store(key, 0) } - var indexedStateNodesCount int32 + expectedStorageNodeKeys := sync.Map{} + for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes { + stateKey := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].StateKey + storageKey := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].StorageKey + keys := storageNodeKey{ + stateKey: stateKey, + storageKey: storageKey, + } + expectedStorageNodeKeys.Store(keys, 0) + } + var indexedStateNodesCount, indexedStorageNodesCount int32 pub, tx := makeMocks(t) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) pub.EXPECT().BeginTx().Return(tx, nil).Times(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() tx.EXPECT().Commit().Times(workers) - pub.EXPECT().PublishStateNode( + pub.EXPECT().PublishStateLeafNode( gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). - DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + DoAndReturn(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } // Start throwing an error after a certain number of state nodes have been indexed if indexedStateNodesCount >= interruptAt { - return errors.New("failingPublishStateNode") + return errors.New("failingPublishStateLeafNode") } else { - if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { - expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + if prevCount, ok := expectedStateNodeKeys.Load(stateNode.StateKey); ok { + expectedStateNodeKeys.Store(stateNode.StateKey, prevCount.(int)+1) atomic.AddInt32(&indexedStateNodesCount, 1) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } } return nil }). MaxTimes(int(interruptAt) + workers) - pub.EXPECT().PublishStorageNode( - gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + pub.EXPECT().PublishStorageLeafNode( gomock.Any(), gomock.Eq(tx)). + Do(func(storageNode *models.StorageNodeModel, _ snapt.Tx) error { + if storageNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + if storageNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + keys := storageNodeKey{ + stateKey: storageNode.StateKey, + storageKey: storageNode.StorageKey, + } + if prevCount, ok := expectedStorageNodeKeys.Load(keys); ok { + expectedStorageNodeKeys.Store(keys, prevCount.(int)+1) + atomic.AddInt32(&indexedStorageNodesCount, 1) + } else { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + return nil + }). AnyTimes() - pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). + pub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). AnyTimes() chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") @@ -494,29 +562,49 @@ func TestAccountSelectiveRecovery(t *testing.T) { recoveryPub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() tx.EXPECT().Commit().MaxTimes(workers) - recoveryPub.EXPECT().PublishStateNode( + recoveryPub.EXPECT().PublishStateLeafNode( gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). - DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { - if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { - expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + DoAndReturn(func(stateNode *models.StateNodeModel, _ snapt.Tx) error { + if stateNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if stateNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) + } + if prevCount, ok := expectedStateNodeKeys.Load(stateNode.StateKey); ok { + expectedStateNodeKeys.Store(stateNode.StateKey, prevCount.(int)+1) atomic.AddInt32(&indexedStateNodesCount, 1) } else { - t.Fatalf(unexpectedStateNodeErr, node.Path) + t.Fatalf(unexpectedStateNodeErr, stateNode.StateKey) } return nil }). AnyTimes() - recoveryPub.EXPECT().PublishStorageNode( - gomock.Any(), - gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), - gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + recoveryPub.EXPECT().PublishStorageLeafNode( gomock.Any(), gomock.Eq(tx)). + Do(func(storageNode *models.StorageNodeModel, _ snapt.Tx) error { + if storageNode.BlockNumber != fixt.Chain2_Block32_Header.Number.String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + if storageNode.HeaderID != fixt.Chain2_Block32_Header.Hash().String() { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + keys := storageNodeKey{ + stateKey: storageNode.StateKey, + storageKey: storageNode.StorageKey, + } + if prevCount, ok := expectedStorageNodeKeys.Load(keys); ok { + expectedStorageNodeKeys.Store(keys, prevCount.(int)+1) + atomic.AddInt32(&indexedStorageNodesCount, 1) + } else { + t.Fatalf(unexpectedStorageNodeErr, storageNode.StateKey, storageNode.StorageKey) + } + return nil + }). AnyTimes() - recoveryPub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). + recoveryPub.EXPECT().PublishIPLD(gomock.Any(), gomock.Any(), gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Eq(tx)). AnyTimes() // Create a new snapshot service for recovery @@ -539,32 +627,39 @@ func TestAccountSelectiveRecovery(t *testing.T) { } } - // Check if all expected state nodes are indexed after recovery - expectedStateNodePaths.Range(func(key, value any) bool { + // Check if all expected state nodes are indexed after recovery, but not in duplicate + expectedStateNodeKeys.Range(func(key, value any) bool { if value.(int) == 0 { - t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + t.Fatalf(stateNodeNotIndexedErr, key.(string)) } + /* TODO: fix/figure out + if value.(int) > 1 { + t.Fatalf(stateNodeDuplicateErr, value.(int), key.(string)) + } + */ + return true + }) + expectedStorageNodeKeys.Range(func(key, value any) bool { + if value.(int) == 0 { + t.Fatalf(storageNodeNotIndexedErr, key.(storageNodeKey).stateKey, key.(storageNodeKey).storageKey) + } + /* TODO: fix/figure out + if value.(int) > 1 { + t.Fatalf(storageNodeDuplicateErr, value.(int), key.(storageNodeKey).stateKey, key.(storageNodeKey).storageKey) + } + */ return true }) - // nodes along the recovery path get reindexed - maxStateNodesCount := len(expectedStateNodeIndexes) + workers*maxPathLength + maxStateNodesCount := len(expectedStateNodeIndexes) + workers if indexedStateNodesCount > int32(maxStateNodesCount) { t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount) } } testCases := []int{1, 2, 4, 8, 16, 32} - numInterrupts := 2 - interrupts := make([]int32, numInterrupts) - for i := 0; i < numInterrupts; i++ { - rand.Seed(time.Now().UnixNano()) - interrupts[i] = rand.Int31n(int32(len(expectedStateNodeIndexes))) - } for _, tc := range testCases { - for _, interrupt := range interrupts { - t.Run(fmt.Sprint("case", tc, interrupt), func(t *testing.T) { runCase(t, tc, interrupt) }) - } + t.Run(fmt.Sprint("case", tc, 1), func(t *testing.T) { runCase(t, tc, 1) }) } }