lotus/chain/index/msgindex_test.go
2023-03-12 13:21:03 +02:00

294 lines
7.3 KiB
Go

package index
import (
"context"
"errors"
"math/rand"
"os"
"testing"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/blocks"
"github.com/stretchr/testify/require"
)
func TestBasicMsgIndex(t *testing.T) {
// the most basic of tests:
// 1. Create an index with mock chain store
// 2. Advance the chain for a few tipsets
// 3. Verify that the index contains all messages with the correct tipset/epoch
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
}
t.Log("verifying index")
verifyIndex(t, cs, msgIndex)
}
func TestReorgMsgIndex(t *testing.T) {
// slightly more nuanced test that includes reorgs
// 1. Create an index with mock chain store
// 2. Advance/Reorg the chain for a few tipsets
// 3. Verify that the index contains all messages with the correct tipst/epoch
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
}
// a simple reorg
t.Log("doing reorg")
reorgme := cs.curTs
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
require.NoError(t, err)
cs.setHead(reorgmeParent)
reorgmeChild := cs.makeBlk()
cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
t.Log("verifying index")
verifyIndex(t, cs, msgIndex)
t.Log("verifying that reorged messages are not present")
verifyMissing(t, cs, msgIndex, reorgme)
}
func TestReconcileMsgIndex(t *testing.T) {
// test that exercises the reconciliation code paths
// 1. Create and populate a basic msgindex, similar to TestBasicMsgIndex.
// 2. Close it
// 3. Reorg the mock chain store
// 4. Reopen the index to trigger reconciliation
// 5. Enxure that only the stable messages remain.
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(tmp, cs)
require.NoError(t, err)
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
}
// Close it and reorg
err = msgIndex.Close()
require.NoError(t, err)
cs.notify = nil
// a simple reorg
t.Log("doing reorg")
reorgme := cs.curTs
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
require.NoError(t, err)
cs.setHead(reorgmeParent)
reorgmeChild := cs.makeBlk()
cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
// reopen to reconcile
msgIndex, err = NewMsgIndex(tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
t.Log("verifying index")
// need to step one up because the last tipset is not known by the index
cs.setHead(reorgmeParent)
verifyIndex(t, cs, msgIndex)
t.Log("verifying that reorged and unknown messages are not present")
verifyMissing(t, cs, msgIndex, reorgme, reorgmeChild)
}
func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
for ts := cs.curTs; ts.Height() > 0; {
t.Logf("verify at height %d", ts.Height())
blks := ts.Blocks()
if len(blks) == 0 {
break
}
tsCid, err := ts.Key().Cid()
require.NoError(t, err)
xindex := 0
for _, blk := range blks {
msgs, _, _ := cs.MessagesForBlock(context.Background(), blk)
for _, m := range msgs {
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.NoError(t, err)
require.Equal(t, tsCid, minfo.TipSet)
require.Equal(t, ts.Height(), minfo.Epoch)
require.Equal(t, xindex, minfo.Index)
xindex++
}
}
parents := ts.Parents()
ts, err = cs.GetTipSetFromKey(context.Background(), parents)
require.NoError(t, err)
}
}
func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing ...*types.TipSet) {
for _, ts := range missing {
for _, blk := range ts.Blocks() {
msgs, _, _ := cs.MessagesForBlock(context.Background(), blk)
for _, m := range msgs {
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.Equal(t, ErrNotFound, err)
}
}
}
}
type mockChainStore struct {
notify store.ReorgNotifee
curTs *types.TipSet
tipsets map[types.TipSetKey]*types.TipSet
blockMsgs map[cid.Cid][]*types.Message
nonce uint64
}
var _ ChainStore = (*mockChainStore)(nil)
var systemAddr address.Address
var rng *rand.Rand
func init() {
systemAddr, _ = address.NewIDAddress(0)
rng = rand.New(rand.NewSource(314159))
}
func newMockChainStore() *mockChainStore {
return &mockChainStore{
tipsets: make(map[types.TipSetKey]*types.TipSet),
blockMsgs: make(map[cid.Cid][]*types.Message),
}
}
func (cs *mockChainStore) genesis() {
genBlock := mock.MkBlock(nil, 0, 0)
cs.blockMsgs[genBlock.Cid()] = nil
genTs := mock.TipSet(genBlock)
cs.setHead(genTs)
}
func (cs *mockChainStore) setHead(ts *types.TipSet) {
cs.curTs = ts
cs.tipsets[ts.Key()] = ts
}
func (cs *mockChainStore) advance() error {
ts := cs.makeBlk()
return cs.reorg(nil, []*types.TipSet{ts})
}
func (cs *mockChainStore) reorg(rev, app []*types.TipSet) error {
for _, ts := range rev {
parents := ts.Parents()
cs.curTs = cs.tipsets[parents]
}
for _, ts := range app {
cs.tipsets[ts.Key()] = ts
cs.curTs = ts
}
if cs.notify != nil {
return cs.notify(rev, app)
}
return nil
}
func (cs *mockChainStore) makeBlk() *types.TipSet {
height := cs.curTs.Height() + 1
blk := mock.MkBlock(cs.curTs, uint64(height), uint64(height))
blk.Messages = cs.makeGarbageCid()
msg1 := cs.makeMsg()
msg2 := cs.makeMsg()
cs.blockMsgs[blk.Cid()] = []*types.Message{msg1, msg2}
return mock.TipSet(blk)
}
func (cs *mockChainStore) makeMsg() *types.Message {
nonce := cs.nonce
cs.nonce++
return &types.Message{To: systemAddr, From: systemAddr, Nonce: nonce}
}
func (cs *mockChainStore) makeGarbageCid() cid.Cid {
garbage := blocks.NewBlock([]byte{byte(rng.Intn(256)), byte(rng.Intn(256)), byte(rng.Intn(256))})
return garbage.Cid()
}
func (cs *mockChainStore) SubscribeHeadChanges(f store.ReorgNotifee) {
cs.notify = f
}
func (cs *mockChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
msgs, ok := cs.blockMsgs[b.Cid()]
if !ok {
return nil, nil, errors.New("unknown block")
}
return msgs, nil, nil
}
func (cs *mockChainStore) GetHeaviestTipSet() *types.TipSet {
return cs.curTs
}
func (cs *mockChainStore) GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
ts, ok := cs.tipsets[tsk]
if !ok {
return nil, errors.New("unknown tipset")
}
return ts, nil
}