basic msgindex itest
This commit is contained in:
parent
22490bdea9
commit
4f5e0a0fe4
@ -52,10 +52,11 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// coalescer configuration (TODO: use observer instead)
|
// coalescer configuration (TODO: use observer instead)
|
||||||
|
// these are exposed to make tests snappy
|
||||||
var (
|
var (
|
||||||
coalesceMinDelay = time.Second
|
CoalesceMinDelay = time.Second
|
||||||
coalesceMaxDelay = 15 * time.Second
|
CoalesceMaxDelay = 15 * time.Second
|
||||||
coalesceMergeInterval = time.Second
|
CoalesceMergeInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// chain store interface; we could use store.ChainStore directly,
|
// chain store interface; we could use store.ChainStore directly,
|
||||||
@ -156,9 +157,9 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex
|
|||||||
|
|
||||||
rnf := store.WrapHeadChangeCoalescer(
|
rnf := store.WrapHeadChangeCoalescer(
|
||||||
msgIndex.onHeadChange,
|
msgIndex.onHeadChange,
|
||||||
coalesceMinDelay,
|
CoalesceMinDelay,
|
||||||
coalesceMaxDelay,
|
CoalesceMaxDelay,
|
||||||
coalesceMergeInterval,
|
CoalesceMergeInterval,
|
||||||
)
|
)
|
||||||
cs.SubscribeHeadChanges(rnf)
|
cs.SubscribeHeadChanges(rnf)
|
||||||
|
|
||||||
@ -440,3 +441,18 @@ func (x *msgIndex) Close() error {
|
|||||||
|
|
||||||
return x.db.Close()
|
return x.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// informal apis for itests; not exposed in the main interface
|
||||||
|
func (x *msgIndex) CountMessages() (int64, error) {
|
||||||
|
x.closeLk.RLock()
|
||||||
|
defer x.closeLk.RUnlock()
|
||||||
|
|
||||||
|
if x.closed {
|
||||||
|
return 0, ErrClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
var result int64
|
||||||
|
row := x.db.QueryRow(dbqCountMessages)
|
||||||
|
err := row.Scan(&result)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
@ -40,7 +40,7 @@ func TestBasicMsgIndex(t *testing.T) {
|
|||||||
err := cs.advance()
|
err := cs.advance()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// wait for the coalescer to notify
|
// wait for the coalescer to notify
|
||||||
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Log("verifying index")
|
t.Log("verifying index")
|
||||||
@ -68,7 +68,7 @@ func TestReorgMsgIndex(t *testing.T) {
|
|||||||
err := cs.advance()
|
err := cs.advance()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// wait for the coalescer to notify
|
// wait for the coalescer to notify
|
||||||
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// a simple reorg
|
// a simple reorg
|
||||||
@ -80,7 +80,7 @@ func TestReorgMsgIndex(t *testing.T) {
|
|||||||
reorgmeChild := cs.makeBlk()
|
reorgmeChild := cs.makeBlk()
|
||||||
err = cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
|
err = cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
|
||||||
|
|
||||||
t.Log("verifying index")
|
t.Log("verifying index")
|
||||||
verifyIndex(t, cs, msgIndex)
|
verifyIndex(t, cs, msgIndex)
|
||||||
@ -110,7 +110,7 @@ func TestReconcileMsgIndex(t *testing.T) {
|
|||||||
err := cs.advance()
|
err := cs.advance()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// wait for the coalescer to notify
|
// wait for the coalescer to notify
|
||||||
time.Sleep(coalesceMinDelay + 10*time.Millisecond)
|
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close it and reorg
|
// Close it and reorg
|
||||||
@ -200,10 +200,9 @@ func init() {
|
|||||||
rng = rand.New(rand.NewSource(314159))
|
rng = rand.New(rand.NewSource(314159))
|
||||||
|
|
||||||
// adjust those to make tests snappy
|
// adjust those to make tests snappy
|
||||||
coalesceMinDelay = time.Millisecond
|
CoalesceMinDelay = time.Millisecond
|
||||||
coalesceMaxDelay = 10 * time.Millisecond
|
CoalesceMaxDelay = 10 * time.Millisecond
|
||||||
coalesceMergeInterval = time.Millisecond
|
CoalesceMergeInterval = time.Millisecond
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockChainStore() *mockChainStore {
|
func newMockChainStore() *mockChainStore {
|
||||||
|
90
itests/msgindex_test.go
Normal file
90
itests/msgindex_test.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// adjust those to make tests snappy
|
||||||
|
index.CoalesceMinDelay = time.Millisecond
|
||||||
|
index.CoalesceMaxDelay = 10 * time.Millisecond
|
||||||
|
index.CoalesceMergeInterval = time.Millisecond
|
||||||
|
}
|
||||||
|
|
||||||
|
func testMsgIndex(
|
||||||
|
t *testing.T,
|
||||||
|
name string,
|
||||||
|
run func(t *testing.T, makeMsgIndex func(cs *store.ChainStore) (index.MsgIndex, error)),
|
||||||
|
check func(t *testing.T, i int, msgIndex index.MsgIndex),
|
||||||
|
) {
|
||||||
|
|
||||||
|
// create the message indices in the test context
|
||||||
|
var mx sync.Mutex
|
||||||
|
var tmpDirs []string
|
||||||
|
var msgIndices []index.MsgIndex
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
for _, msgIndex := range msgIndices {
|
||||||
|
_ = msgIndex.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tmp := range tmpDirs {
|
||||||
|
_ = os.RemoveAll(tmp)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
makeMsgIndex := func(cs *store.ChainStore) (index.MsgIndex, error) {
|
||||||
|
var err error
|
||||||
|
tmp := t.TempDir()
|
||||||
|
msgIndex, err := index.NewMsgIndex(context.Background(), tmp, cs)
|
||||||
|
if err == nil {
|
||||||
|
mx.Lock()
|
||||||
|
tmpDirs = append(tmpDirs, tmp)
|
||||||
|
msgIndices = append(msgIndices, msgIndex)
|
||||||
|
mx.Unlock()
|
||||||
|
}
|
||||||
|
return msgIndex, err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
run(t, makeMsgIndex)
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(msgIndices) == 0 {
|
||||||
|
t.Fatal("no message indices")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, msgIndex := range msgIndices {
|
||||||
|
check(t, i, msgIndex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkNonEmptyMsgIndex(t *testing.T, _ int, msgIndex index.MsgIndex) {
|
||||||
|
mi, ok := msgIndex.(interface{ CountMessages() (int64, error) })
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("index does not allow counting")
|
||||||
|
}
|
||||||
|
count, err := mi.CountMessages()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, count, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMsgIndex(t *testing.T) {
|
||||||
|
testMsgIndex(t, "testSearchMsg", testSearchMsgWithIndex, checkNonEmptyMsgIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSearchMsgWithIndex(t *testing.T, makeMsgIndex func(cs *store.ChainStore) (index.MsgIndex, error)) {
|
||||||
|
suite := apiSuite{opts: []interface{}{kit.ConstructorOpts(node.Override(new(index.MsgIndex), makeMsgIndex))}}
|
||||||
|
suite.testSearchMsg(t)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user