lotus/chain/events/events_test.go

1460 lines
37 KiB
Go

package events
import (
"context"
"fmt"
"sync"
"testing"
"gotest.tools/assert"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
var dummyCid cid.Cid
func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}
type fakeMsg struct {
bmsgs []*types.Message
smsgs []*types.SignedMessage
}
type fakeCS struct {
t *testing.T
h abi.ChainEpoch
tsc *tipSetCache
msgs map[cid.Cid]fakeMsg
blkMsgs map[cid.Cid]cid.Cid
sync sync.Mutex
tipsets map[types.TipSetKey]*types.TipSet
sub func(rev, app []*types.TipSet)
callNumberLk sync.Mutex
callNumber map[string]int
}
func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainHead"] = fcs.callNumber["ChainHead"] + 1
panic("implement me")
}
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetTipSet"] = fcs.callNumber["ChainGetTipSet"] + 1
return fcs.tipsets[key], nil
}
func (fcs *fakeCS) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["StateSearchMsg"] = fcs.callNumber["StateSearchMsg"] + 1
return nil, nil
}
func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["StateGetActor"] = fcs.callNumber["StateGetActor"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetTipSetByHeight"] = fcs.callNumber["ChainGetTipSetByHeight"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
a, _ := address.NewFromString("t00")
b, _ := address.NewFromString("t02")
var ts, err = types.NewTipSet([]*types.BlockHeader{
{
Height: h,
Miner: a,
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}},
ParentStateRoot: dummyCid,
Messages: msgcid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
},
{
Height: h,
Miner: b,
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}},
ParentStateRoot: dummyCid,
Messages: msgcid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
},
})
if fcs.tipsets == nil {
fcs.tipsets = map[types.TipSetKey]*types.TipSet{}
}
fcs.tipsets[ts.Key()] = ts
require.NoError(t, err)
return ts
}
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainNotify"] = fcs.callNumber["ChainNotify"] + 1
out := make(chan []*api.HeadChange, 1)
best, err := fcs.tsc.best()
if err != nil {
return nil, err
}
out <- []*api.HeadChange{{Type: store.HCCurrent, Val: best}}
fcs.sub = func(rev, app []*types.TipSet) {
notif := make([]*api.HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &api.HeadChange{
Type: store.HCRevert,
Val: r,
}
}
for i, r := range app {
notif[i+len(rev)] = &api.HeadChange{
Type: store.HCApply,
Val: r,
}
}
out <- notif
}
return out, nil
}
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetBlockMessages"] = fcs.callNumber["ChainGetBlockMessages"] + 1
messages, ok := fcs.blkMsgs[blk]
if !ok {
return &api.BlockMessages{}, nil
}
ms, ok := fcs.msgs[messages]
if !ok {
return &api.BlockMessages{}, nil
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
}
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
n := len(fcs.msgs)
c, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.IDENTITY,
MhLength: -1,
}.Sum([]byte(fmt.Sprintf("%d", n)))
require.NoError(fcs.t, err)
fcs.msgs[c] = m
return c
}
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { // todo: allow msgs
if fcs.sub == nil {
fcs.t.Fatal("sub not be nil")
}
nullm := map[int]struct{}{}
for _, v := range nulls {
nullm[v] = struct{}{}
}
var revs []*types.TipSet
for i := 0; i < rev; i++ {
ts, err := fcs.tsc.best()
require.NoError(fcs.t, err)
if _, ok := nullm[int(ts.Height())]; !ok {
revs = append(revs, ts)
require.NoError(fcs.t, fcs.tsc.revert(ts))
}
fcs.h--
}
var apps []*types.TipSet
for i := 0; i < app; i++ {
fcs.h++
mc, hasMsgs := msgs[i]
if !hasMsgs {
mc = dummyCid
}
if _, ok := nullm[int(fcs.h)]; ok {
continue
}
best, err := fcs.tsc.best()
require.NoError(fcs.t, err)
ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts))
if hasMsgs {
fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc
}
apps = append(apps, ts)
}
fcs.sync.Lock()
fcs.sub(revs, apps)
fcs.sync.Lock()
fcs.sync.Unlock() //nolint:staticcheck
}
func (fcs *fakeCS) notifDone() {
fcs.sync.Unlock()
}
var _ EventAPI = &fakeCS{}
func TestAt(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 3, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(0, 3, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(10, 10, nil)
require.Equal(t, true, applied)
require.Equal(t, true, reverted)
applied = false
reverted = false
fcs.advance(10, 1, nil)
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
reverted = false
fcs.advance(0, 1, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 2, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 1, nil) // 8
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtDoubleTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 6, nil)
require.False(t, applied)
require.False(t, reverted)
fcs.advance(0, 1, nil)
require.True(t, applied)
require.False(t, reverted)
applied = false
fcs.advance(2, 2, nil)
require.False(t, applied)
require.False(t, reverted)
fcs.advance(4, 4, nil)
require.True(t, applied)
require.True(t, reverted)
}
func TestAtNullTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, abi.ChainEpoch(6), ts.Height())
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 6, nil, 5)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
}
func TestAtNullConf(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 6, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil, 8)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(7, 1, nil)
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
reverted = false
}
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 5, nil) // 6
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 5, nil) // 11
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 10, nil) // 11
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 11, int(curH))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtChained(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 10)
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 15, nil)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtChainedConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 15, nil)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 10)
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtChainedConfidenceNull(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 15, nil, 5)
var applied bool
var reverted bool
err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
applied = true
require.Equal(t, 6, int(ts.Height()))
return nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matched bool, err error) {
return func(msg *types.Message) (matched bool, err error) {
return to == msg.To && m == msg.Method, nil
}
}
func TestCalled(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
more := true
var applied, reverted bool
var appliedMsg *types.Message
var appliedTs *types.TipSet
var appliedH abi.ChainEpoch
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
appliedMsg = msg
appliedTs = ts
appliedH = curH
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional block so we are above confidence threshold
fcs.advance(0, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// dip below confidence
fcs.advance(2, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
require.Equal(t, abi.ChainEpoch(7), appliedTs.Height())
require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String())
require.Equal(t, abi.ChainEpoch(10), appliedH)
require.Equal(t, t0123, appliedMsg.To)
require.Equal(t, uint64(1), appliedMsg.Nonce)
require.Equal(t, abi.MethodNum(5), appliedMsg.Method)
// revert some blocks, keep the message
fcs.advance(3, 1, nil) // H=8 (confidence=1)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// revert the message
fcs.advance(2, 1, nil) // H=7, we reverted ts with the msg execution, but not the msg itself
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
reverted = false
// send new message on different height
n2msg := fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
})
fcs.advance(0, 3, map[int]cid.Cid{ // (n2msg confidence=1)
0: n2msg,
})
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
require.Equal(t, false, reverted)
require.Equal(t, abi.ChainEpoch(10), appliedH)
applied = false
fcs.advance(0, 2, nil) // (confidence=3)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
require.Equal(t, abi.ChainEpoch(9), appliedTs.Height())
require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String())
require.Equal(t, abi.ChainEpoch(12), appliedH)
require.Equal(t, t0123, appliedMsg.To)
require.Equal(t, uint64(2), appliedMsg.Nonce)
require.Equal(t, abi.MethodNum(5), appliedMsg.Method)
// revert and apply at different height
fcs.advance(8, 6, map[int]cid.Cid{ // (confidence=3)
1: n2msg,
})
// TODO: We probably don't want to call revert/apply, as restarting certain
// actions may be expensive, and in this case the message is still
// on-chain, just at different height
require.Equal(t, true, applied)
require.Equal(t, true, reverted)
reverted = false
applied = false
require.Equal(t, abi.ChainEpoch(7), appliedTs.Height())
require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String())
require.Equal(t, abi.ChainEpoch(10), appliedH)
require.Equal(t, t0123, appliedMsg.To)
require.Equal(t, uint64(2), appliedMsg.Nonce)
require.Equal(t, abi.MethodNum(5), appliedMsg.Method)
// call method again
fcs.advance(0, 5, map[int]cid.Cid{
0: n2msg,
})
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// send and revert below confidence, then cross confidence
fcs.advance(0, 2, map[int]cid.Cid{
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 3},
},
}),
})
fcs.advance(2, 5, nil) // H=19, but message reverted
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// test timeout (it's set to 20 in the call to `events.Called` above)
fcs.advance(0, 6, nil)
require.Equal(t, false, applied) // not calling timeout as we received messages
require.Equal(t, false, reverted)
// test unregistering with more
more = false
fcs.advance(0, 5, map[int]cid.Cid{
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 4}, // this signals we don't want more
},
}),
})
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(0, 5, map[int]cid.Cid{
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 5},
},
}),
})
require.Equal(t, false, applied) // should not get any further notifications
require.Equal(t, false, reverted)
// revert after disabled
fcs.advance(5, 1, nil) // try reverting msg sent after disabling
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(5, 1, nil) // try reverting msg sent before disabling
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
}
func TestCalledTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
called := false
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, abi.ChainEpoch(20), ts.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.True(t, called)
called = false
// with check func reporting done
fcs = &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events = NewEvents(context.Background(), fcs)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, abi.ChainEpoch(20), ts.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.False(t, called)
}
func TestCalledOrder(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
at := 0
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
switch at {
case 0:
require.Equal(t, uint64(1), msg.Nonce)
require.Equal(t, abi.ChainEpoch(4), ts.Height())
case 1:
require.Equal(t, uint64(2), msg.Nonce)
require.Equal(t, abi.ChainEpoch(5), ts.Height())
default:
t.Fatal("apply should only get called twice, at: ", at)
}
at++
return true, nil
}, func(_ context.Context, ts *types.TipSet) error {
switch at {
case 2:
require.Equal(t, abi.ChainEpoch(5), ts.Height())
case 3:
require.Equal(t, abi.ChainEpoch(4), ts.Height())
default:
t.Fatal("revert should only get called twice, at: ", at)
}
at++
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 10, map[int]cid.Cid{
1: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
},
}),
2: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
}),
})
fcs.advance(9, 1, nil)
}
func TestCalledNull(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
more := true
var applied, reverted bool
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional blocks so we are above confidence threshold, but with null tipset at the height
// of application
fcs.advance(0, 3, nil, 10) // H=11 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(5, 1, nil, 10)
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
}
func TestRemoveTriggersOnMessage(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
more := true
var applied, reverted bool
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=5, applied at H=6; H=8 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// revert applied TS & message TS
fcs.advance(3, 1, nil) // H=6 (tipset message applied in reverted, AND message reverted)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional blocks so we are above confidence threshold, but message not applied
// as it was reverted
fcs.advance(0, 5, nil) // H=11 (confidence=3, apply)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message again (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=12, applied at H=13; H=15 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// revert applied height TS, but don't remove message trigger
fcs.advance(2, 1, nil) // H=13 (tipset message applied in reverted, by tipset with message not reverted)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional blocks so we are above confidence threshold
fcs.advance(0, 4, nil) // H=18 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
type testStateChange struct {
from string
to string
}
func TestStateChanged(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
more := true
var applied, reverted bool
var appliedData StateChange
var appliedOldTs *types.TipSet
var appliedNewTs *types.TipSet
var appliedH abi.ChainEpoch
var matchData StateChange
confidence := 3
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
appliedData = data
appliedOldTs = oldTs
appliedNewTs = newTs
appliedH = curH
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
if matchData == nil {
return false, matchData, nil
}
d := matchData
matchData = nil
return true, d, nil
})
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create state change (but below confidence threshold)
matchData = testStateChange{from: "a", to: "b"}
fcs.advance(0, 3, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional block so we are above confidence threshold
fcs.advance(0, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// dip below confidence (should not apply again)
fcs.advance(2, 2, nil) // H=10 (confidence=3, apply)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Change happens from 5 -> 6
require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height())
require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height())
// Actually applied (with confidence) at 9
require.Equal(t, abi.ChainEpoch(9), appliedH)
// Make sure the state change was correctly passed through
rcvd := appliedData.(testStateChange)
require.Equal(t, "a", rcvd.from)
require.Equal(t, "b", rcvd.to)
}
func TestStateChangedRevert(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
more := true
var applied, reverted bool
var matchData StateChange
confidence := 1
timeout := abi.ChainEpoch(20)
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
applied = true
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
if matchData == nil {
return false, matchData, nil
}
d := matchData
matchData = nil
return true, d, nil
})
require.NoError(t, err)
fcs.advance(0, 2, nil) // H=3
// Make a state change from TS at height 3 to TS at height 4
matchData = testStateChange{from: "a", to: "b"}
fcs.advance(0, 1, nil) // H=4
// Haven't yet reached confidence
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Advance to reach confidence level
fcs.advance(0, 1, nil) // H=5
// Should now have called the handler
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
// Advance 3 more TS
fcs.advance(0, 3, nil) // H=8
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Regress but not so far as to cause a revert
fcs.advance(3, 1, nil) // H=6
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// Regress back to state where change happened
fcs.advance(3, 1, nil) // H=4
// Expect revert to have happened
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
}
func TestStateChangedTimeout(t *testing.T) {
timeoutHeight := abi.ChainEpoch(20)
confidence := 3
testCases := []struct {
name string
checkFn CheckFunc
nilBlocks []int
expectTimeout bool
}{{
// Verify that the state changed timeout is called at the expected height
name: "state changed timeout",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
expectTimeout: true,
}, {
// Verify that the state changed timeout is called even if the timeout
// falls on nil block
name: "state changed timeout falls on nil block",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
nilBlocks: []int{20, 21, 22, 23},
expectTimeout: true,
}, {
// Verify that the state changed timeout is not called if the check
// function reports that it's complete
name: "no timeout callback if check func reports done",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
},
expectTimeout: false,
}}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
// Track whether the callback was called
called := false
// Set up state change tracking that will timeout at the given height
err := events.StateChanged(
tc.checkFn,
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
// Expect the callback to be called at the timeout height with nil data
called = true
require.Nil(t, data)
require.Equal(t, timeoutHeight, newTs.Height())
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)
// Advance to timeout height
fcs.advance(0, int(timeoutHeight)+1, nil)
require.False(t, called)
// Advance past timeout height
fcs.advance(0, 5, nil, tc.nilBlocks...)
require.Equal(t, tc.expectTimeout, called)
called = false
})
}
}
func TestCalledMultiplePerEpoch(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
at := 0
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
switch at {
case 0:
require.Equal(t, uint64(1), msg.Nonce)
require.Equal(t, abi.ChainEpoch(4), ts.Height())
case 1:
require.Equal(t, uint64(2), msg.Nonce)
require.Equal(t, abi.ChainEpoch(4), ts.Height())
default:
t.Fatal("apply should only get called twice, at: ", at)
}
at++
return true, nil
}, func(_ context.Context, ts *types.TipSet) error {
switch at {
case 2:
require.Equal(t, abi.ChainEpoch(4), ts.Height())
case 3:
require.Equal(t, abi.ChainEpoch(4), ts.Height())
default:
t.Fatal("revert should only get called twice, at: ", at)
}
at++
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
fcs.advance(0, 10, map[int]cid.Cid{
1: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
}),
})
fcs.advance(9, 1, nil)
}
func TestCachedSameBlock(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
_ = NewEvents(context.Background(), fcs)
fcs.advance(0, 10, map[int]cid.Cid{})
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 20, "expect call ChainGetBlockMessages %d but got ", 20, fcs.callNumber["ChainGetBlockMessages"])
fcs.advance(5, 10, map[int]cid.Cid{})
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 30, "expect call ChainGetBlockMessages %d but got ", 30, fcs.callNumber["ChainGetBlockMessages"])
}