test test test
This commit is contained in:
parent
e6a0c5a406
commit
dad3442500
@ -16,7 +16,7 @@ type MsgInfo struct {
|
|||||||
// the message this record refers to
|
// the message this record refers to
|
||||||
Message cid.Cid
|
Message cid.Cid
|
||||||
// the tipset where this messages was executed
|
// the tipset where this messages was executed
|
||||||
Tipset cid.Cid
|
TipSet cid.Cid
|
||||||
// the epoch whre this message was executed
|
// the epoch whre this message was executed
|
||||||
Epoch abi.ChainEpoch
|
Epoch abi.ChainEpoch
|
||||||
// the canonical execution order of the message in the tipset
|
// the canonical execution order of the message in the tipset
|
||||||
|
@ -73,13 +73,6 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
if basePath == ":memory:" {
|
|
||||||
// for testing
|
|
||||||
mkdb = true
|
|
||||||
dbPath = basePath
|
|
||||||
goto opendb
|
|
||||||
}
|
|
||||||
|
|
||||||
err = os.MkdirAll(basePath, 0755)
|
err = os.MkdirAll(basePath, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("error creating msgindex base directory: %w", err)
|
return nil, xerrors.Errorf("error creating msgindex base directory: %w", err)
|
||||||
@ -95,7 +88,6 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
|||||||
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
|
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
opendb:
|
|
||||||
db, err := sql.Open("sqlite3", dbPath)
|
db, err := sql.Open("sqlite3", dbPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO [nice to have]: automaticaly delete corrupt databases
|
// TODO [nice to have]: automaticaly delete corrupt databases
|
||||||
@ -232,21 +224,21 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (x *msgIndex) prepareStatements() error {
|
func (x *msgIndex) prepareStatements() error {
|
||||||
stmt, err := x.db.Prepare("SELECT (tipset, xepoch, xindex) FROM Messages WHERE cid = ?")
|
stmt, err := x.db.Prepare("SELECT tipset, xepoch, xindex FROM Messages WHERE cid = ?")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("prepare selectMsgStmt: %w", err)
|
||||||
}
|
}
|
||||||
x.selectMsgStmt = stmt
|
x.selectMsgStmt = stmt
|
||||||
|
|
||||||
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
|
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
|
||||||
}
|
}
|
||||||
x.insertMsgStmt = stmt
|
x.insertMsgStmt = stmt
|
||||||
|
|
||||||
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
|
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return xerrors.Errorf("prepare deleteTipSetStmt: %w", err)
|
||||||
}
|
}
|
||||||
x.deleteTipSetStmt = stmt
|
x.deleteTipSetStmt = stmt
|
||||||
|
|
||||||
@ -417,7 +409,7 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
|||||||
|
|
||||||
return MsgInfo{
|
return MsgInfo{
|
||||||
Message: m,
|
Message: m,
|
||||||
Tipset: tipsetCid,
|
TipSet: tipsetCid,
|
||||||
Epoch: abi.ChainEpoch(epoch),
|
Epoch: abi.ChainEpoch(epoch),
|
||||||
Index: int(index),
|
Index: int(index),
|
||||||
}, nil
|
}, nil
|
||||||
|
293
chain/index/msgindex_test.go
Normal file
293
chain/index/msgindex_test.go
Normal file
@ -0,0 +1,293 @@
|
|||||||
|
package index
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipfs/go-libipfs/blocks"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBasicMsgIndex(t *testing.T) {
|
||||||
|
// the most basic of tests:
|
||||||
|
// 1. Create an index with mock chain store
|
||||||
|
// 2. Advance the chain for a few tipsets
|
||||||
|
// 3. Verify that the index contains all messages with the correct tipset/epoch
|
||||||
|
cs := newMockChainStore()
|
||||||
|
cs.genesis()
|
||||||
|
|
||||||
|
tmp := t.TempDir()
|
||||||
|
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
|
||||||
|
|
||||||
|
msgIndex, err := NewMsgIndex(tmp, cs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer msgIndex.Close() //nolint
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
t.Logf("advance to epoch %d", i+1)
|
||||||
|
err := cs.advance()
|
||||||
|
require.NoError(t, err)
|
||||||
|
// wait for the coalescer to notify
|
||||||
|
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("verifying index")
|
||||||
|
verifyIndex(t, cs, msgIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReorgMsgIndex(t *testing.T) {
|
||||||
|
// slightly more nuanced test that includes reorgs
|
||||||
|
// 1. Create an index with mock chain store
|
||||||
|
// 2. Advance/Reorg the chain for a few tipsets
|
||||||
|
// 3. Verify that the index contains all messages with the correct tipst/epoch
|
||||||
|
cs := newMockChainStore()
|
||||||
|
cs.genesis()
|
||||||
|
|
||||||
|
tmp := t.TempDir()
|
||||||
|
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
|
||||||
|
|
||||||
|
msgIndex, err := NewMsgIndex(tmp, cs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer msgIndex.Close() //nolint
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
t.Logf("advance to epoch %d", i+1)
|
||||||
|
err := cs.advance()
|
||||||
|
require.NoError(t, err)
|
||||||
|
// wait for the coalescer to notify
|
||||||
|
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// a simple reorg
|
||||||
|
t.Log("doing reorg")
|
||||||
|
reorgme := cs.curTs
|
||||||
|
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
|
||||||
|
require.NoError(t, err)
|
||||||
|
cs.setHead(reorgmeParent)
|
||||||
|
reorgmeChild := cs.makeBlk()
|
||||||
|
cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
|
||||||
|
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
||||||
|
|
||||||
|
t.Log("verifying index")
|
||||||
|
verifyIndex(t, cs, msgIndex)
|
||||||
|
|
||||||
|
t.Log("verifying that reorged messages are not present")
|
||||||
|
verifyMissing(t, cs, msgIndex, reorgme)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReconcileMsgIndex(t *testing.T) {
|
||||||
|
// test that exercises the reconciliation code paths
|
||||||
|
// 1. Create and populate a basic msgindex, similar to TestBasicMsgIndex.
|
||||||
|
// 2. Close it
|
||||||
|
// 3. Reorg the mock chain store
|
||||||
|
// 4. Reopen the index to trigger reconciliation
|
||||||
|
// 5. Enxure that only the stable messages remain.
|
||||||
|
cs := newMockChainStore()
|
||||||
|
cs.genesis()
|
||||||
|
|
||||||
|
tmp := t.TempDir()
|
||||||
|
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
|
||||||
|
|
||||||
|
msgIndex, err := NewMsgIndex(tmp, cs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
t.Logf("advance to epoch %d", i+1)
|
||||||
|
err := cs.advance()
|
||||||
|
require.NoError(t, err)
|
||||||
|
// wait for the coalescer to notify
|
||||||
|
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close it and reorg
|
||||||
|
err = msgIndex.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
cs.notify = nil
|
||||||
|
|
||||||
|
// a simple reorg
|
||||||
|
t.Log("doing reorg")
|
||||||
|
reorgme := cs.curTs
|
||||||
|
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
|
||||||
|
require.NoError(t, err)
|
||||||
|
cs.setHead(reorgmeParent)
|
||||||
|
reorgmeChild := cs.makeBlk()
|
||||||
|
cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
|
||||||
|
|
||||||
|
// reopen to reconcile
|
||||||
|
msgIndex, err = NewMsgIndex(tmp, cs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer msgIndex.Close() //nolint
|
||||||
|
|
||||||
|
t.Log("verifying index")
|
||||||
|
// need to step one up because the last tipset is not known by the index
|
||||||
|
cs.setHead(reorgmeParent)
|
||||||
|
verifyIndex(t, cs, msgIndex)
|
||||||
|
|
||||||
|
t.Log("verifying that reorged and unknown messages are not present")
|
||||||
|
verifyMissing(t, cs, msgIndex, reorgme, reorgmeChild)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
|
||||||
|
for ts := cs.curTs; ts.Height() > 0; {
|
||||||
|
t.Logf("verify at height %d", ts.Height())
|
||||||
|
blks := ts.Blocks()
|
||||||
|
if len(blks) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
tsCid, err := ts.Key().Cid()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
xindex := 0
|
||||||
|
for _, blk := range blks {
|
||||||
|
msgs, _, _ := cs.MessagesForBlock(context.Background(), blk)
|
||||||
|
for _, m := range msgs {
|
||||||
|
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tsCid, minfo.TipSet)
|
||||||
|
require.Equal(t, ts.Height(), minfo.Epoch)
|
||||||
|
require.Equal(t, xindex, minfo.Index)
|
||||||
|
xindex++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parents := ts.Parents()
|
||||||
|
ts, err = cs.GetTipSetFromKey(context.Background(), parents)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing ...*types.TipSet) {
|
||||||
|
for _, ts := range missing {
|
||||||
|
for _, blk := range ts.Blocks() {
|
||||||
|
msgs, _, _ := cs.MessagesForBlock(context.Background(), blk)
|
||||||
|
for _, m := range msgs {
|
||||||
|
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||||
|
require.Equal(t, ErrNotFound, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockChainStore struct {
|
||||||
|
notify store.ReorgNotifee
|
||||||
|
|
||||||
|
curTs *types.TipSet
|
||||||
|
tipsets map[types.TipSetKey]*types.TipSet
|
||||||
|
blockMsgs map[cid.Cid][]*types.Message
|
||||||
|
|
||||||
|
nonce uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ ChainStore = (*mockChainStore)(nil)
|
||||||
|
|
||||||
|
var systemAddr address.Address
|
||||||
|
var rng *rand.Rand
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
systemAddr, _ = address.NewIDAddress(0)
|
||||||
|
rng = rand.New(rand.NewSource(314159))
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockChainStore() *mockChainStore {
|
||||||
|
return &mockChainStore{
|
||||||
|
tipsets: make(map[types.TipSetKey]*types.TipSet),
|
||||||
|
blockMsgs: make(map[cid.Cid][]*types.Message),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) genesis() {
|
||||||
|
genBlock := mock.MkBlock(nil, 0, 0)
|
||||||
|
cs.blockMsgs[genBlock.Cid()] = nil
|
||||||
|
genTs := mock.TipSet(genBlock)
|
||||||
|
cs.setHead(genTs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) setHead(ts *types.TipSet) {
|
||||||
|
cs.curTs = ts
|
||||||
|
cs.tipsets[ts.Key()] = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) advance() error {
|
||||||
|
ts := cs.makeBlk()
|
||||||
|
return cs.reorg(nil, []*types.TipSet{ts})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) reorg(rev, app []*types.TipSet) error {
|
||||||
|
for _, ts := range rev {
|
||||||
|
parents := ts.Parents()
|
||||||
|
cs.curTs = cs.tipsets[parents]
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ts := range app {
|
||||||
|
cs.tipsets[ts.Key()] = ts
|
||||||
|
cs.curTs = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
if cs.notify != nil {
|
||||||
|
return cs.notify(rev, app)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) makeBlk() *types.TipSet {
|
||||||
|
height := cs.curTs.Height() + 1
|
||||||
|
|
||||||
|
blk := mock.MkBlock(cs.curTs, uint64(height), uint64(height))
|
||||||
|
blk.Messages = cs.makeGarbageCid()
|
||||||
|
msg1 := cs.makeMsg()
|
||||||
|
msg2 := cs.makeMsg()
|
||||||
|
cs.blockMsgs[blk.Cid()] = []*types.Message{msg1, msg2}
|
||||||
|
|
||||||
|
return mock.TipSet(blk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) makeMsg() *types.Message {
|
||||||
|
nonce := cs.nonce
|
||||||
|
cs.nonce++
|
||||||
|
return &types.Message{To: systemAddr, From: systemAddr, Nonce: nonce}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) makeGarbageCid() cid.Cid {
|
||||||
|
garbage := blocks.NewBlock([]byte{byte(rng.Intn(256)), byte(rng.Intn(256)), byte(rng.Intn(256))})
|
||||||
|
return garbage.Cid()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) SubscribeHeadChanges(f store.ReorgNotifee) {
|
||||||
|
cs.notify = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
msgs, ok := cs.blockMsgs[b.Cid()]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, errors.New("unknown block")
|
||||||
|
}
|
||||||
|
|
||||||
|
return msgs, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) GetHeaviestTipSet() *types.TipSet {
|
||||||
|
return cs.curTs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *mockChainStore) GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
ts, ok := cs.tipsets[tsk]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("unknown tipset")
|
||||||
|
}
|
||||||
|
return ts, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user