cache loaded block messages
This commit is contained in:
parent
1c30d07d68
commit
eef3fd5d7a
@ -5,6 +5,9 @@ import (
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -464,14 +467,20 @@ type messageEvents struct {
|
||||
|
||||
lk sync.RWMutex
|
||||
matchers map[triggerID]MsgMatchFunc
|
||||
|
||||
blockMsgLk sync.Mutex
|
||||
blockMsgCache *lru.ARCCache
|
||||
}
|
||||
|
||||
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) messageEvents {
|
||||
blsMsgCache, _ := lru.NewARC(500)
|
||||
return messageEvents{
|
||||
ctx: ctx,
|
||||
cs: cs,
|
||||
hcAPI: hcAPI,
|
||||
matchers: make(map[triggerID]MsgMatchFunc),
|
||||
ctx: ctx,
|
||||
cs: cs,
|
||||
hcAPI: hcAPI,
|
||||
matchers: make(map[triggerID]MsgMatchFunc),
|
||||
blockMsgLk: sync.Mutex{},
|
||||
blockMsgCache: blsMsgCache,
|
||||
}
|
||||
}
|
||||
|
||||
@ -515,14 +524,21 @@ func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Mes
|
||||
seen := map[cid.Cid]struct{}{}
|
||||
|
||||
for _, tsb := range ts.Blocks() {
|
||||
|
||||
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
|
||||
if err != nil {
|
||||
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
|
||||
// this is quite bad, but probably better than missing all the other updates
|
||||
continue
|
||||
me.blockMsgLk.Lock()
|
||||
msgsI, ok := me.blockMsgCache.Get(tsb.Cid())
|
||||
var err error
|
||||
if !ok {
|
||||
msgsI, err = me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
|
||||
if err != nil {
|
||||
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
|
||||
// this is quite bad, but probably better than missing all the other updates
|
||||
me.blockMsgLk.Unlock()
|
||||
continue
|
||||
}
|
||||
me.blockMsgCache.Add(tsb.Cid(), msgsI)
|
||||
}
|
||||
|
||||
me.blockMsgLk.Unlock()
|
||||
msgs := msgsI.(*api.BlockMessages)
|
||||
for _, m := range msgs.BlsMessages {
|
||||
_, ok := seen[m.Cid()]
|
||||
if ok {
|
||||
|
@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"gotest.tools/assert"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -44,25 +46,43 @@ type fakeCS struct {
|
||||
tipsets map[types.TipSetKey]*types.TipSet
|
||||
|
||||
sub func(rev, app []*types.TipSet)
|
||||
|
||||
callNumberLk sync.Mutex
|
||||
callNumber map[string]int
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["ChainHead"] = fcs.callNumber["ChainHead"] + 1
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["ChainGetTipSet"] = fcs.callNumber["ChainGetTipSet"] + 1
|
||||
return fcs.tipsets[key], nil
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["StateSearchMsg"] = fcs.callNumber["StateSearchMsg"] + 1
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["StateGetActor"] = fcs.callNumber["StateGetActor"] + 1
|
||||
panic("Not Implemented")
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["ChainGetTipSetByHeight"] = fcs.callNumber["ChainGetTipSetByHeight"] + 1
|
||||
panic("Not Implemented")
|
||||
}
|
||||
|
||||
@ -113,6 +133,10 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["ChainNotify"] = fcs.callNumber["ChainNotify"] + 1
|
||||
|
||||
out := make(chan []*api.HeadChange, 1)
|
||||
best, err := fcs.tsc.best()
|
||||
if err != nil {
|
||||
@ -143,6 +167,9 @@ func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) {
|
||||
fcs.callNumberLk.Lock()
|
||||
defer fcs.callNumberLk.Unlock()
|
||||
fcs.callNumber["ChainGetBlockMessages"] = fcs.callNumber["ChainGetBlockMessages"] + 1
|
||||
messages, ok := fcs.blkMsgs[blk]
|
||||
if !ok {
|
||||
return &api.BlockMessages{}, nil
|
||||
@ -152,8 +179,8 @@ func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api
|
||||
if !ok {
|
||||
return &api.BlockMessages{}, nil
|
||||
}
|
||||
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
|
||||
|
||||
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
|
||||
@ -233,9 +260,10 @@ var _ EventAPI = &fakeCS{}
|
||||
|
||||
func TestAt(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -298,9 +326,10 @@ func TestAt(t *testing.T) {
|
||||
|
||||
func TestAtDoubleTrigger(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -340,9 +369,10 @@ func TestAtDoubleTrigger(t *testing.T) {
|
||||
|
||||
func TestAtNullTrigger(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -374,9 +404,10 @@ func TestAtNullTrigger(t *testing.T) {
|
||||
|
||||
func TestAtNullConf(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -413,9 +444,10 @@ func TestAtNullConf(t *testing.T) {
|
||||
|
||||
func TestAtStart(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -447,9 +479,10 @@ func TestAtStart(t *testing.T) {
|
||||
|
||||
func TestAtStartConfidence(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -477,9 +510,10 @@ func TestAtStartConfidence(t *testing.T) {
|
||||
|
||||
func TestAtChained(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -511,9 +545,10 @@ func TestAtChained(t *testing.T) {
|
||||
|
||||
func TestAtChainedConfidence(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -545,9 +580,10 @@ func TestAtChainedConfidence(t *testing.T) {
|
||||
|
||||
func TestAtChainedConfidenceNull(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -583,9 +619,10 @@ func TestCalled(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -795,9 +832,10 @@ func TestCalledTimeout(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -835,9 +873,10 @@ func TestCalledTimeout(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
callNumber: map[string]int{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -869,9 +908,10 @@ func TestCalledOrder(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -932,9 +972,10 @@ func TestCalledNull(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -997,9 +1038,10 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1087,9 +1129,10 @@ func TestStateChanged(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1175,9 +1218,10 @@ func TestStateChangedRevert(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1253,9 +1297,10 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
callNumber: map[string]int{},
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1293,9 +1338,10 @@ func TestStateChangedTimeout(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
callNumber: map[string]int{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1329,9 +1375,10 @@ func TestCalledMultiplePerEpoch(t *testing.T) {
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
callNumber: map[string]int{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
@ -1382,3 +1429,24 @@ func TestCalledMultiplePerEpoch(t *testing.T) {
|
||||
|
||||
fcs.advance(9, 1, nil)
|
||||
}
|
||||
|
||||
func TestCachedSameBlock(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||
callNumber: map[string]int{},
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||
|
||||
_ = NewEvents(context.Background(), fcs)
|
||||
|
||||
fcs.advance(0, 10, map[int]cid.Cid{})
|
||||
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 20, "expect call ChainGetBlockMessages %d but got ", 20, fcs.callNumber["ChainGetBlockMessages"])
|
||||
|
||||
fcs.advance(5, 10, map[int]cid.Cid{})
|
||||
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 30, "expect call ChainGetBlockMessages %d but got ", 30, fcs.callNumber["ChainGetBlockMessages"])
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user